Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airbyte-cdk): use orjson to speed up parsing #44829

Merged
merged 3 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import json
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Generator, Mapping

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from orjson import orjson

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -72,4 +72,4 @@ def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], No
# TODO???: set delimiter? usually it is `\n` but maybe it would be useful to set optional?
# https://github.com/airbytehq/airbyte-internal-issues/issues/8436
for record in response.iter_lines():
yield json.loads(record)
yield orjson.loads(record)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import TYPE_PYTHON_MAPPING, SchemaType
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from orjson import orjson

DIALECT_NAME = "_config_dialect"

Expand Down Expand Up @@ -274,7 +275,7 @@ def _pre_propcess_property_types(property_types: Dict[str, Any]) -> Mapping[str,

@staticmethod
def _cast_types(
row: Dict[str, str], deduped_property_types: Dict[str, str], config_format: CsvFormat, logger: logging.Logger
row: Dict[str, str], deduped_property_types: Mapping[str, str], config_format: CsvFormat, logger: logging.Logger
) -> Dict[str, Any]:
"""
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
Expand Down Expand Up @@ -308,8 +309,8 @@ def _cast_types(
elif python_type == dict:
try:
# we don't re-use _value_to_object here because we type the column as object as long as there is only one object
cast_value = json.loads(value)
except json.JSONDecodeError:
cast_value = orjson.loads(value)
except orjson.JSONDecodeError:
warnings.append(_format_warning(key, value, prop_type))

elif python_type == list:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import PYTHON_TYPE_MAPPING, SchemaType, merge_schemas
from orjson import orjson


class JsonlParser(FileTypeParser):
Expand Down Expand Up @@ -100,15 +101,15 @@ def _parse_jsonl_entries(
read_bytes += len(line)
accumulator += line # type: ignore [operator] # In reality, it's either bytes or string and we add the same type
try:
record = json.loads(accumulator)
record = orjson.loads(accumulator)
if had_json_parsing_error and not has_warned_for_multiline_json_object:
logger.warning(f"File at {file.uri} is using multiline JSON. Performance could be greatly reduced")
has_warned_for_multiline_json_object = True

yield record
yielded_at_least_once = True
accumulator = self._instantiate_accumulator(line)
except json.JSONDecodeError:
except orjson.JSONDecodeError:
had_json_parsing_error = True

if read_limit and yielded_at_least_once and read_bytes >= self.MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE:
Expand Down
6 changes: 3 additions & 3 deletions airbyte-cdk/python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions airbyte-cdk/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ unstructured = { version = "0.10.27", extras = ["docx", "pptx"], optional = true
pyjwt = "^2.8.0"
cryptography = "^42.0.5"
pytz = "2024.1"
orjson = "^3.10.7"

[tool.poetry.group.dev.dependencies]
freezegun = "*"
Expand Down
Loading