Skip to content

Commit

Permalink
feat: treat empty strings as null values from csv imports (#1996)
Browse files Browse the repository at this point in the history
* test: add integration test for empty string csv

* Fix typo and make CSV schema fields nullable

Signed-off-by: Florent Biville <florent.biville@neo4j.com>

* chore: clean up unnecessary checks and tests

* chore: fix style

* test: add unit test for CsvSources

* test: improvement on CsvSources test

* chore: fix style

* fix: add yaml support for specification parser

* Revert "fix: add yaml support for specification parser"

This reverts commit 03ff0ef.

---------

Signed-off-by: Florent Biville <florent.biville@neo4j.com>
  • Loading branch information
Emrehzl94 authored Nov 11, 2024
1 parent 1bef5ff commit 4e550bc
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,26 @@ public class CsvSources {
public static CSVFormat toCsvFormat(TextFormat format) {
switch (format) {
case EXCEL:
return CSVFormat.EXCEL;
return CSVFormat.EXCEL.withNullString("");
case INFORMIX:
return CSVFormat.INFORMIX_UNLOAD_CSV;
return CSVFormat.INFORMIX_UNLOAD_CSV.withNullString("");
case MONGO:
return CSVFormat.MONGODB_CSV;
return CSVFormat.MONGODB_CSV.withNullString("");
case MONGO_TSV:
return CSVFormat.MONGODB_TSV;
return CSVFormat.MONGODB_TSV.withNullString("");
case MYSQL:
return CSVFormat.MYSQL;
return CSVFormat.MYSQL.withNullString("");
case ORACLE:
return CSVFormat.ORACLE;
return CSVFormat.ORACLE.withNullString("");
case POSTGRES:
return CSVFormat.POSTGRESQL_TEXT;
return CSVFormat.POSTGRESQL_TEXT.withNullString("");
case POSTGRESQL_CSV:
return CSVFormat.POSTGRESQL_CSV;
return CSVFormat.POSTGRESQL_CSV.withNullString("");
case RFC4180:
return CSVFormat.RFC4180;
return CSVFormat.RFC4180.withNullString("");
case DEFAULT:
default:
return CSVFormat.DEFAULT;
return CSVFormat.DEFAULT.withNullString("");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public static Schema toBeamSchema(
public static Schema textToBeamSchema(List<String> fields) {
return new Schema(
fields.stream()
.map(field -> Schema.Field.of(field, FieldType.STRING))
.map(field -> Schema.Field.of(field, FieldType.STRING).withNullable(true))
.collect(Collectors.toList()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.neo4j.model.helpers;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.teleport.v2.neo4j.model.sources.TextFormat;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class CsvSourcesTest {

@Parameterized.Parameter(0)
public TextFormat textFormat;

@Parameterized.Parameter(1)
public char delimiter;

@Parameterized.Parameters(name = "{0}")
public static List<Object[]> testParameters() {
return Arrays.asList(
new Object[][] {
{TextFormat.EXCEL, ','},
{TextFormat.INFORMIX, ','},
{TextFormat.MONGO, ','},
{TextFormat.MONGO_TSV, '\t'},
{TextFormat.MYSQL, '\t'},
{TextFormat.ORACLE, ','},
{TextFormat.POSTGRES, '\t'},
{TextFormat.POSTGRESQL_CSV, ','},
{TextFormat.RFC4180, ','},
{TextFormat.DEFAULT, ','}
});
}

@Test
public void shouldParseEmptyColumnsAsNullValues() throws IOException {
String line = "1" + delimiter;
CSVFormat csvFormat = CsvSources.toCsvFormat(textFormat);

try (CSVParser csvParser = CSVParser.parse(line, csvFormat)) {
List<CSVRecord> csvRecords = csvParser.getRecords();
assertThat(csvRecords).hasSize(1);

CSVRecord csvRecord = csvRecords.get(0);
assertThat(csvRecord).hasSize(2);
assertThat(csvRecord.get(0)).isEqualTo("1");
assertThat(csvRecord.get(1)).isEqualTo(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
import java.util.stream.Collectors;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.conditions.ConditionCheck;
import org.apache.beam.it.gcp.TemplateTestBase;
import org.apache.beam.it.gcp.bigquery.BigQueryResourceManager;
import org.apache.beam.it.neo4j.Neo4jResourceManager;
import org.apache.beam.it.neo4j.conditions.Neo4jQueryCheck;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
Expand Down Expand Up @@ -302,6 +304,46 @@ public void supportsMappedTypesForExternalCsv() throws Exception {
.meetsConditions();
}

@Test
public void treatsEmptyCsvStringsAsNulls() throws IOException {
gcsClient.createArtifact(
"external.csv", contentOf("/testing-specs/data-conversion/empty-strings.csv"));
gcsClient.createArtifact(
"spec.json", contentOf("/testing-specs/data-conversion/empty-strings-spec.json"));
gcsClient.createArtifact(
"neo4j.json",
String.format(
"{\n"
+ " \"server_url\": \"%s\",\n"
+ " \"database\": \"%s\",\n"
+ " \"auth_type\": \"basic\",\n"
+ " \"username\": \"neo4j\",\n"
+ " \"pwd\": \"%s\"\n"
+ "}",
neo4jClient.getUri(), neo4jClient.getDatabaseName(), neo4jClient.getAdminPassword()));

LaunchConfig.Builder options =
LaunchConfig.builder(testName, specPath)
.addParameter("jobSpecUri", getGcsPath("spec.json"))
.addParameter("neo4jConnectionUri", getGcsPath("neo4j.json"))
.addParameter(
"optionsJson",
String.format("{\"externalcsvuri\": \"%s\"}", getGcsPath("external.csv")));

LaunchInfo info = launchTemplate(options);

assertThatPipeline(info).isRunning();
PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(
createConfig(info),
Neo4jQueryCheck.builder(neo4jClient)
.setQuery("MATCH (n:Node) RETURN n {.*} as properties")
.setExpectedResult(List.of(Map.of("properties", Map.of("int64", "1"))))
.build());
assertThatResult(result).meetsConditions();
}

@SuppressWarnings("rawtypes")
@NotNull
private Supplier[] generateChecks(Map<String, Object> expectedRow) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"source": {
"type": "text",
"name": "data",
"ordered_field_names": "int64,datetime",
"uri": "$externalcsvuri"
},
"targets": [
{
"node": {
"name": "Node",
"source": "data",
"mode": "append",
"mappings": {
"label": "\"Node\"",
"properties": {
"keys": [
"int64"
],
"dates": [
"datetime"
]
}
}
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,

0 comments on commit 4e550bc

Please sign in to comment.