diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/CsvSources.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/CsvSources.java index 207985491f..913767b7e8 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/CsvSources.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/CsvSources.java @@ -22,26 +22,26 @@ public class CsvSources { public static CSVFormat toCsvFormat(TextFormat format) { switch (format) { case EXCEL: - return CSVFormat.EXCEL; + return CSVFormat.EXCEL.withNullString(""); case INFORMIX: - return CSVFormat.INFORMIX_UNLOAD_CSV; + return CSVFormat.INFORMIX_UNLOAD_CSV.withNullString(""); case MONGO: - return CSVFormat.MONGODB_CSV; + return CSVFormat.MONGODB_CSV.withNullString(""); case MONGO_TSV: - return CSVFormat.MONGODB_TSV; + return CSVFormat.MONGODB_TSV.withNullString(""); case MYSQL: - return CSVFormat.MYSQL; + return CSVFormat.MYSQL.withNullString(""); case ORACLE: - return CSVFormat.ORACLE; + return CSVFormat.ORACLE.withNullString(""); case POSTGRES: - return CSVFormat.POSTGRESQL_TEXT; + return CSVFormat.POSTGRESQL_TEXT.withNullString(""); case POSTGRESQL_CSV: - return CSVFormat.POSTGRESQL_CSV; + return CSVFormat.POSTGRESQL_CSV.withNullString(""); case RFC4180: - return CSVFormat.RFC4180; + return CSVFormat.RFC4180.withNullString(""); case DEFAULT: default: - return CSVFormat.DEFAULT; + return CSVFormat.DEFAULT.withNullString(""); } } } diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamUtils.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamUtils.java index 50a7e5f945..573879d91e 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamUtils.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamUtils.java @@ -150,7 +150,7 @@ public static Schema toBeamSchema( public static Schema textToBeamSchema(List fields) { return new Schema( fields.stream() - .map(field -> Schema.Field.of(field, FieldType.STRING)) + .map(field -> Schema.Field.of(field, FieldType.STRING).withNullable(true)) .collect(Collectors.toList())); } diff --git a/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/model/helpers/CsvSourcesTest.java b/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/model/helpers/CsvSourcesTest.java new file mode 100644 index 0000000000..1f28e9cb9b --- /dev/null +++ b/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/model/helpers/CsvSourcesTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.neo4j.model.helpers; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.teleport.v2.neo4j.model.sources.TextFormat; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class CsvSourcesTest { + + @Parameterized.Parameter(0) + public TextFormat textFormat; + + @Parameterized.Parameter(1) + public char delimiter; + + @Parameterized.Parameters(name = "{0}") + public static List testParameters() { + return Arrays.asList( + new Object[][] { + {TextFormat.EXCEL, ','}, + {TextFormat.INFORMIX, ','}, + {TextFormat.MONGO, ','}, + {TextFormat.MONGO_TSV, '\t'}, + {TextFormat.MYSQL, '\t'}, + {TextFormat.ORACLE, ','}, + {TextFormat.POSTGRES, '\t'}, + {TextFormat.POSTGRESQL_CSV, ','}, + {TextFormat.RFC4180, ','}, + {TextFormat.DEFAULT, ','} + }); + } + + @Test + public void shouldParseEmptyColumnsAsNullValues() throws IOException { + String line = "1" + delimiter; + CSVFormat csvFormat = CsvSources.toCsvFormat(textFormat); + + try (CSVParser csvParser = CSVParser.parse(line, csvFormat)) { + List csvRecords = csvParser.getRecords(); + assertThat(csvRecords).hasSize(1); + + CSVRecord csvRecord = csvRecords.get(0); + assertThat(csvRecord).hasSize(2); + assertThat(csvRecord.get(0)).isEqualTo("1"); + assertThat(csvRecord.get(1)).isEqualTo(null); + } + } +} diff --git a/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/templates/DataConversionIT.java b/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/templates/DataConversionIT.java index 43465f21e1..3621f8fd1e 100644 --- a/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/templates/DataConversionIT.java +++ b/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/templates/DataConversionIT.java @@ -43,12 +43,14 @@ import java.util.stream.Collectors; import org.apache.beam.it.common.PipelineLauncher.LaunchConfig; import org.apache.beam.it.common.PipelineLauncher.LaunchInfo; +import org.apache.beam.it.common.PipelineOperator; import org.apache.beam.it.common.TestProperties; import org.apache.beam.it.common.utils.ResourceManagerUtils; import org.apache.beam.it.conditions.ConditionCheck; import org.apache.beam.it.gcp.TemplateTestBase; import org.apache.beam.it.gcp.bigquery.BigQueryResourceManager; import org.apache.beam.it.neo4j.Neo4jResourceManager; +import org.apache.beam.it.neo4j.conditions.Neo4jQueryCheck; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; @@ -302,6 +304,46 @@ public void supportsMappedTypesForExternalCsv() throws Exception { .meetsConditions(); } + @Test + public void treatsEmptyCsvStringsAsNulls() throws IOException { + gcsClient.createArtifact( + "external.csv", contentOf("/testing-specs/data-conversion/empty-strings.csv")); + gcsClient.createArtifact( + "spec.json", contentOf("/testing-specs/data-conversion/empty-strings-spec.json")); + gcsClient.createArtifact( + "neo4j.json", + String.format( + "{\n" + + " \"server_url\": \"%s\",\n" + + " \"database\": \"%s\",\n" + + " \"auth_type\": \"basic\",\n" + + " \"username\": \"neo4j\",\n" + + " \"pwd\": \"%s\"\n" + + "}", + neo4jClient.getUri(), neo4jClient.getDatabaseName(), neo4jClient.getAdminPassword())); + + LaunchConfig.Builder options = + LaunchConfig.builder(testName, specPath) + .addParameter("jobSpecUri", getGcsPath("spec.json")) + .addParameter("neo4jConnectionUri", getGcsPath("neo4j.json")) + .addParameter( + "optionsJson", + String.format("{\"externalcsvuri\": \"%s\"}", getGcsPath("external.csv"))); + + LaunchInfo info = launchTemplate(options); + + assertThatPipeline(info).isRunning(); + PipelineOperator.Result result = + pipelineOperator() + .waitForCondition( + createConfig(info), + Neo4jQueryCheck.builder(neo4jClient) + .setQuery("MATCH (n:Node) RETURN n {.*} as properties") + .setExpectedResult(List.of(Map.of("properties", Map.of("int64", "1")))) + .build()); + assertThatResult(result).meetsConditions(); + } + @SuppressWarnings("rawtypes") @NotNull private Supplier[] generateChecks(Map expectedRow) { diff --git a/v2/googlecloud-to-neo4j/src/test/resources/testing-specs/data-conversion/empty-strings-spec.json b/v2/googlecloud-to-neo4j/src/test/resources/testing-specs/data-conversion/empty-strings-spec.json new file mode 100644 index 0000000000..42efba53e8 --- /dev/null +++ b/v2/googlecloud-to-neo4j/src/test/resources/testing-specs/data-conversion/empty-strings-spec.json @@ -0,0 +1,28 @@ +{ + "source": { + "type": "text", + "name": "data", + "ordered_field_names": "int64,datetime", + "uri": "$externalcsvuri" + }, + "targets": [ + { + "node": { + "name": "Node", + "source": "data", + "mode": "append", + "mappings": { + "label": "\"Node\"", + "properties": { + "keys": [ + "int64" + ], + "dates": [ + "datetime" + ] + } + } + } + } + ] +} diff --git a/v2/googlecloud-to-neo4j/src/test/resources/testing-specs/data-conversion/empty-strings.csv b/v2/googlecloud-to-neo4j/src/test/resources/testing-specs/data-conversion/empty-strings.csv new file mode 100644 index 0000000000..3b88900682 --- /dev/null +++ b/v2/googlecloud-to-neo4j/src/test/resources/testing-specs/data-conversion/empty-strings.csv @@ -0,0 +1 @@ +1, \ No newline at end of file