Skip to content

Commit

Permalink
fix(python-cdk): add user friendly message for encoding errors (#44438)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
  • Loading branch information
strosek and girarda authored Aug 28, 2024
1 parent dc6a1cc commit fc8cd5a
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import traceback
from typing import TYPE_CHECKING, Optional, Tuple

from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy
from airbyte_cdk.sources.file_based.exceptions import CheckAvailabilityError, CustomFileBasedException, FileBasedSourceError
Expand Down Expand Up @@ -66,6 +67,8 @@ def check_availability_and_parsability(
# If the parser is set to not check parsability, we still want to check that we can open the file.
handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger)
handle.close()
except AirbyteTracedException as ate:
raise ate
except CheckAvailabilityError:
return False, "".join(traceback.format_exc())

Expand Down Expand Up @@ -98,6 +101,8 @@ def _check_parse_record(self, stream: "AbstractFileBasedStream", file: RemoteFil
# consider the connection check successful even though it means
# we skip the schema validation check.
return
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
raise CheckAvailabilityError(FileBasedSourceError.ERROR_READING_FILE, stream=stream.name, file=file.uri) from exc

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class FileBasedSourceError(Enum):
GLOB_PARSE_ERROR = (
"Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split."
)
ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding."
ERROR_CASTING_VALUE = "Could not cast the value to the expected type."
ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string."
ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
)

errors = []
tracebacks = []
for stream in streams:
if not isinstance(stream, AbstractFileBasedStream):
raise ValueError(f"Stream {stream} is not a file-based stream.")
Expand All @@ -130,12 +131,34 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
stream_is_available,
reason,
) = stream.availability_strategy.check_availability_and_parsability(stream, logger, self)
except AirbyteTracedException as ate:
errors.append(f"Unable to connect to stream {stream.name} - {ate.message}")
tracebacks.append(traceback.format_exc())
except Exception:
errors.append(f"Unable to connect to stream {stream.name} - {''.join(traceback.format_exc())}")
errors.append(f"Unable to connect to stream {stream.name}")
tracebacks.append(traceback.format_exc())
else:
if not stream_is_available and reason:
errors.append(reason)

if len(errors) == 1 and len(tracebacks) == 1:
raise AirbyteTracedException(
internal_message=tracebacks[0],
message=f"{errors[0]}",
failure_type=FailureType.config_error,
)
if len(errors) == 1 and len(tracebacks) == 0:
raise AirbyteTracedException(
message=f"{errors[0]}",
failure_type=FailureType.config_error,
)
elif len(errors) > 1:
raise AirbyteTracedException(
internal_message="\n".join(tracebacks),
message=f"{len(errors)} streams with errors: {', '.join(error for error in errors)}",
failure_type=FailureType.config_error,
)

return not bool(errors), (errors or None)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ def read_data(
quoting=csv.QUOTE_MINIMAL,
)
with stream_reader.open_file(file, file_read_mode, config_format.encoding, logger) as fp:
headers = self._get_headers(fp, config_format, dialect_name)
try:
headers = self._get_headers(fp, config_format, dialect_name)
except UnicodeError:
raise AirbyteTracedException(
message=f"{FileBasedSourceError.ENCODING_ERROR.value} Expected encoding: {config_format.encoding}",
)

rows_to_skip = (
config_format.skip_rows_before_header
Expand Down Expand Up @@ -274,7 +279,7 @@ def _pre_propcess_property_types(property_types: Dict[str, Any]) -> Mapping[str,

@staticmethod
def _cast_types(
row: Dict[str, str], deduped_property_types: Dict[str, str], config_format: CsvFormat, logger: logging.Logger
row: Dict[str, str], deduped_property_types: Mapping[str, str], config_format: CsvFormat, logger: logging.Logger
) -> Dict[str, Any]:
"""
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def state(self, value: MutableMapping[str, Any]) -> None:
"""State setter, accept state serialized by state getter."""
self._cursor.set_initial_state(value)

@property
@property # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors
def cursor(self) -> Optional[AbstractFileBasedCursor]:
return self._cursor

Expand Down Expand Up @@ -172,13 +172,14 @@ def get_json_schema(self) -> JsonSchema:
try:
schema = self._get_raw_json_schema()
except InvalidSchemaError as config_exception:
self.logger.exception(FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, exc_info=config_exception)
raise AirbyteTracedException(
internal_message="Please check the logged errors for more information.",
message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value,
exception=AirbyteTracedException(exception=config_exception),
failure_type=FailureType.config_error,
)
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
raise SchemaInferenceError(FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name) from exc
else:
Expand Down Expand Up @@ -279,6 +280,8 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
for task in done:
try:
base_schema = merge_schemas(base_schema, task.result())
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
self.logger.error(f"An error occurred inferring the schema. \n {traceback.format_exc()}", exc_info=exc)

Expand All @@ -287,6 +290,8 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
try:
return await self.get_parser().infer_schema(self.config, file, self.stream_reader, self.logger)
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,17 @@ def test_parse_field_size_larger_than_default_python_maximum(self) -> None:
data_generator = self._read_data()
assert list(data_generator) == [{"header1": "1", "header2": long_string}]

def test_read_data_with_encoding_error(self) -> None:
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data(["something"]).build()
self._csv_reader._get_headers = Mock(side_effect=UnicodeDecodeError("encoding", b"", 0, 1, "reason"))

with pytest.raises(AirbyteTracedException) as ate:
data_generator = self._read_data()
assert len(list(data_generator)) == 0

assert "encoding" in ate.value.message
assert self._csv_reader._get_headers.called

def _read_data(self) -> Generator[Dict[str, str], None, None]:
data_generator = self._csv_reader.read_data(
self._config,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError
from unit_tests.sources.file_based.helpers import (
FailingSchemaValidationPolicy,
Expand Down Expand Up @@ -130,7 +130,7 @@
_base_failure_scenario.copy()
.set_name("error_empty_stream_scenario")
.set_source_builder(_base_failure_scenario.copy().source_builder.copy().set_files({}))
.set_expected_check_error(None, FileBasedSourceError.EMPTY_STREAM.value)
.set_expected_check_error(AirbyteTracedException, FileBasedSourceError.EMPTY_STREAM.value)
).build()


Expand All @@ -142,7 +142,7 @@
TestErrorListMatchingFilesInMemoryFilesStreamReader(files=_base_failure_scenario.source_builder._files, file_type="csv")
)
)
.set_expected_check_error(None, FileBasedSourceError.ERROR_LISTING_FILES.value)
.set_expected_check_error(AirbyteTracedException, FileBasedSourceError.ERROR_LISTING_FILES.value)
).build()


Expand All @@ -154,7 +154,7 @@
TestErrorOpenFileInMemoryFilesStreamReader(files=_base_failure_scenario.source_builder._files, file_type="csv")
)
)
.set_expected_check_error(None, FileBasedSourceError.ERROR_READING_FILE.value)
.set_expected_check_error(AirbyteTracedException, FileBasedSourceError.ERROR_READING_FILE.value)
).build()


Expand Down Expand Up @@ -216,5 +216,5 @@
],
}
)
.set_expected_check_error(None, FileBasedSourceError.ERROR_READING_FILE.value)
.set_expected_check_error(AirbyteTracedException, FileBasedSourceError.ERROR_READING_FILE.value)
).build()
Original file line number Diff line number Diff line change
Expand Up @@ -3240,6 +3240,7 @@
}
)
.set_expected_records(None)
.set_expected_check_error(AirbyteTracedException, None)
).build()

csv_no_records_scenario: TestScenario[InMemoryFilesSource] = (
Expand Down Expand Up @@ -3343,4 +3344,5 @@
}
)
.set_expected_records(None)
.set_expected_check_error(AirbyteTracedException, None)
).build()
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,11 @@ def verify_check(capsys: CaptureFixture[str], tmp_path: PosixPath, scenario: Tes
expected_exc, expected_msg = scenario.expected_check_error

if expected_exc:
with pytest.raises(expected_exc):
output = check(capsys, tmp_path, scenario)
if expected_msg:
# expected_msg is a string. what's the expected value field?
assert expected_msg in output["message"] # type: ignore
assert output["status"] == scenario.expected_check_status
with pytest.raises(expected_exc) as exc:
check(capsys, tmp_path, scenario)

if expected_msg:
assert expected_msg in exc.value.message

else:
output = check(capsys, tmp_path, scenario)
Expand Down

0 comments on commit fc8cd5a

Please sign in to comment.