Skip to content

Commit e9f378e

Browse files
Merge branch 'main' into ac8/fix-calling-transformations-twice
2 parents 666bd76 + 8b534b0 commit e9f378e

File tree

5 files changed

+46
-3
lines changed

5 files changed

+46
-3
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3630,6 +3630,9 @@ definitions:
36303630
delimiter:
36313631
type: string
36323632
default: ","
3633+
set_empty_cell_to_none:
3634+
type: boolean
3635+
default: false
36333636
AsyncJobStatusMap:
36343637
description: Matches the api job status to Async Job Status.
36353638
type: object

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class CsvParser(Parser):
103103
# TODO: migrate implementation to re-use file-base classes
104104
encoding: Optional[str] = "utf-8"
105105
delimiter: Optional[str] = ","
106+
set_empty_cell_to_none: Optional[bool] = False
106107

107108
def _get_delimiter(self) -> Optional[str]:
108109
"""
@@ -121,6 +122,8 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
121122
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
122123
reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",")
123124
for row in reader:
125+
if self.set_empty_cell_to_none:
126+
row = {k: (None if v == "" else v) for k, v in row.items()}
124127
yield row
125128

126129

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,6 +1383,7 @@ class CsvDecoder(BaseModel):
13831383
type: Literal["CsvDecoder"]
13841384
encoding: Optional[str] = "utf-8"
13851385
delimiter: Optional[str] = ","
1386+
set_empty_cell_to_none: Optional[bool] = False
13861387

13871388

13881389
class AsyncJobStatusMap(BaseModel):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2648,7 +2648,11 @@ def _get_parser(model: BaseModel, config: Config) -> Parser:
26482648
elif isinstance(model, JsonlDecoderModel):
26492649
return JsonLineParser()
26502650
elif isinstance(model, CsvDecoderModel):
2651-
return CsvParser(encoding=model.encoding, delimiter=model.delimiter)
2651+
return CsvParser(
2652+
encoding=model.encoding,
2653+
delimiter=model.delimiter,
2654+
set_empty_cell_to_none=model.set_empty_cell_to_none,
2655+
)
26522656
elif isinstance(model, GzipDecoderModel):
26532657
return GzipParser(
26542658
inner_parser=ModelToComponentFactory._get_parser(model.decoder, config)

unit_tests/sources/declarative/decoders/test_composite_decoder.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,24 @@ def compress_with_gzip(data: str, encoding: str = "utf-8"):
4141

4242

4343
def generate_csv(
44-
encoding: str = "utf-8", delimiter: str = ",", should_compress: bool = False
44+
encoding: str = "utf-8",
45+
delimiter: str = ",",
46+
should_compress: bool = False,
47+
add_empty_strings: bool = False,
4548
) -> bytes:
4649
data = [
4750
{"id": "1", "name": "John", "age": "28"},
4851
{"id": "2", "name": "Alice", "age": "34"},
4952
{"id": "3", "name": "Bob", "age": "25"},
5053
]
54+
fieldnames = ["id", "name", "age"]
55+
if add_empty_strings:
56+
for row in data:
57+
row["gender"] = ""
58+
fieldnames.append("gender")
5159

5260
output = StringIO()
53-
writer = csv.DictWriter(output, fieldnames=["id", "name", "age"], delimiter=delimiter)
61+
writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=delimiter)
5462
writer.writeheader()
5563
for row in data:
5664
writer.writerow(row)
@@ -258,6 +266,30 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
258266
assert parsed_records == expected_data
259267

260268

269+
@pytest.mark.parametrize("set_empty_cell_to_none", [True, False])
270+
def test_composite_raw_decoder_parse_empty_strings(requests_mock, set_empty_cell_to_none: bool):
271+
requests_mock.register_uri(
272+
"GET",
273+
"https://airbyte.io/",
274+
content=generate_csv(should_compress=False, add_empty_strings=True),
275+
)
276+
response = requests.get("https://airbyte.io/", stream=True)
277+
278+
parser = CsvParser(set_empty_cell_to_none=set_empty_cell_to_none)
279+
composite_raw_decoder = CompositeRawDecoder(parser=parser)
280+
281+
expected_data = [
282+
{"id": "1", "name": "John", "age": "28"},
283+
{"id": "2", "name": "Alice", "age": "34"},
284+
{"id": "3", "name": "Bob", "age": "25"},
285+
]
286+
for expected_record in expected_data:
287+
expected_record["gender"] = None if set_empty_cell_to_none else ""
288+
289+
parsed_records = list(composite_raw_decoder.decode(response))
290+
assert parsed_records == expected_data
291+
292+
261293
class TestServer(BaseHTTPRequestHandler):
262294
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name
263295

0 commit comments

Comments
 (0)