Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,25 @@
* Convenience object for invoking SQL query as well as providing descriptions for read and cast
* phase of transform.
*/
public class SqlQuerySpec {
public class BigQuerySpec {

private final String readDescription;
private final String castDescription;
private final String sql;
private final String queryTempProject;
private final String queryTempDataset;

public SqlQuerySpec(String readDescription, String castDescription, String sql) {
public BigQuerySpec(
String readDescription,
String castDescription,
String sql,
String queryTempProject,
String queryTempDataset) {

Check warning on line 35 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java#L35

Added line #L35 was not covered by tests
this.readDescription = readDescription;
this.castDescription = castDescription;
this.sql = sql;
this.queryTempProject = queryTempProject;
this.queryTempDataset = queryTempDataset;

Check warning on line 40 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java#L39-L40

Added lines #L39 - L40 were not covered by tests
}

public String getReadDescription() {
Expand All @@ -43,29 +52,50 @@
return sql;
}

public static class SqlQuerySpecBuilder {
public String getQueryTempProject() {
return queryTempProject;

Check warning on line 56 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java#L56

Added line #L56 was not covered by tests
}

public String getQueryTempDataset() {
return queryTempDataset;

Check warning on line 60 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java#L60

Added line #L60 was not covered by tests
}

public static class BigQuerySpecBuilder {

Check warning on line 63 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java#L63

Added line #L63 was not covered by tests

private String readDescription;
private String castDescription;
private String sql;
private String queryTempProject;
private String queryTempDataset;

public SqlQuerySpecBuilder readDescription(String readDescription) {
public BigQuerySpecBuilder readDescription(String readDescription) {
this.readDescription = readDescription;
return this;
}

public SqlQuerySpecBuilder castDescription(String castDescription) {
public BigQuerySpecBuilder castDescription(String castDescription) {
this.castDescription = castDescription;
return this;
}

public SqlQuerySpecBuilder sql(String sql) {
public BigQuerySpecBuilder sql(String sql) {
this.sql = sql;
return this;
}

public SqlQuerySpec build() {
return new SqlQuerySpec(readDescription, castDescription, sql);
public BigQuerySpecBuilder queryTempProject(String queryTempProject) {
this.queryTempProject = queryTempProject;
return this;

Check warning on line 88 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java#L87-L88

Added lines #L87 - L88 were not covered by tests
}

public BigQuerySpecBuilder queryTempDataset(String queryTempDataset) {
this.queryTempDataset = queryTempDataset;
return this;

Check warning on line 93 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java#L92-L93

Added lines #L92 - L93 were not covered by tests
}

public BigQuerySpec build() {
return new BigQuerySpec(

Check warning on line 97 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/BigQuerySpec.java#L97

Added line #L97 was not covered by tests
readDescription, castDescription, sql, queryTempProject, queryTempDataset);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@

private final String name;
private final String query;
private final String queryTempProject;
private final String queryTempDataset;

public BigQuerySource(String name, String query) {
this(name, query, null, null);
}

public BigQuerySource(
String name, String query, String queryTempProject, String queryTempDataset) {
this.name = name;
this.query = query;
this.queryTempProject = queryTempProject;
this.queryTempDataset = queryTempDataset;
}

@Override
Expand All @@ -43,6 +51,14 @@
return query;
}

public String getQueryTempProject() {
return queryTempProject;
}

public String getQueryTempDataset() {
return queryTempDataset;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -62,6 +78,19 @@

@Override
public String toString() {
return "BigQuerySource{" + "name='" + name + '\'' + ", query='" + query + '\'' + '}';
return "BigQuerySource{"

Check warning on line 81 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/sources/BigQuerySource.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/sources/BigQuerySource.java#L81

Added line #L81 was not covered by tests
+ "name='"
+ name
+ '\''
+ ", query='"
+ query
+ '\''
+ ", queryTempProject='"
+ queryTempProject
+ '\''
+ ", queryTempDataset='"
+ queryTempDataset
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
*/
package com.google.cloud.teleport.v2.neo4j.model.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Optional;
import org.neo4j.importer.v1.sources.SourceProvider;

public class BigQuerySourceProvider implements SourceProvider<BigQuerySource> {

@Override
public String supportedType() {
return "bigquery";
}

@Override
public BigQuerySource provide(ObjectNode node) {
return new BigQuerySource(node.get("name").textValue(), node.get("query").textValue());
return new BigQuerySource(
node.get("name").textValue(),
node.get("query").textValue(),
Optional.ofNullable(node.get("query_temp_project")).map(JsonNode::textValue).orElse(null),
Optional.ofNullable(node.get("query_temp_dataset")).map(JsonNode::textValue).orElse(null));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.neo4j.model.validation;

import com.google.cloud.teleport.v2.neo4j.model.sources.BigQuerySource;
import java.util.LinkedHashSet;
import java.util.Set;
import org.neo4j.importer.v1.sources.Source;
import org.neo4j.importer.v1.validation.SpecificationValidationResult;
import org.neo4j.importer.v1.validation.SpecificationValidator;

public class BigQuerySourceProjectDatasetValidator implements SpecificationValidator {

private static final String ERROR_CODE = "DFBQ-001";
private final Set<String> paths = new LinkedHashSet<>();

@Override
public void visitSource(int index, Source source) {
if (!(source instanceof BigQuerySource)) {
return;
}

var queryTempProject = ((BigQuerySource) source).getQueryTempProject();
var queryTempDataset = ((BigQuerySource) source).getQueryTempDataset();

if (queryTempProject != null && queryTempDataset == null) {
paths.add(String.format("$.sources[%d]", index));
}
}

@Override
public boolean report(SpecificationValidationResult.Builder builder) {
paths.forEach(
path ->
builder.addError(
path,
ERROR_CODE,
String.format(
"%s query_temp_project is provided, but query_temp_dataset is missing", path)));
return paths.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package com.google.cloud.teleport.v2.neo4j.providers.bigquery;

import com.google.cloud.teleport.v2.neo4j.model.helpers.SqlQuerySpec;
import com.google.cloud.teleport.v2.neo4j.model.helpers.SqlQuerySpec.SqlQuerySpecBuilder;
import com.google.cloud.teleport.v2.neo4j.model.helpers.BigQuerySpec;
import com.google.cloud.teleport.v2.neo4j.model.helpers.BigQuerySpec.BigQuerySpecBuilder;
import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec;
import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetSequence;
import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
Expand All @@ -33,11 +33,9 @@

/** Provider implementation for reading and writing BigQuery. */
public class BigQueryImpl implements Provider {

private static final Logger LOG = LoggerFactory.getLogger(BigQueryImpl.class);
private final BigQuerySource source;
private final TargetSequence targetSequence;

private OptionsParams optionsParams;

public BigQueryImpl(BigQuerySource source, TargetSequence targetSequence) {
Expand Down Expand Up @@ -67,15 +65,15 @@

@Override
public PTransform<PBegin, PCollection<Row>> queryMetadata() {
return new BqQueryToRow(getMetadataQueryBeamSpec(source));
return new BqQueryToRow(getMetadataQueryBeamSpec());

Check warning on line 68 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java#L68

Added line #L68 was not covered by tests
}

/**
* Returns zero rows metadata query based on original query.
*
* @return helper object includes metadata and SQL
*/
public SqlQuerySpec getMetadataQueryBeamSpec(BigQuerySource source) {
public BigQuerySpec getMetadataQueryBeamSpec() {

String baseQuery = source.getQuery();

Expand All @@ -85,10 +83,12 @@
String zeroRowSql = "SELECT * FROM (" + baseQuery + ") LIMIT 0";
LOG.info("Reading BQ metadata with query: {}", zeroRowSql);

return new SqlQuerySpecBuilder()
return new BigQuerySpecBuilder()

Check warning on line 86 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java#L86

Added line #L86 was not covered by tests
.readDescription("Read from BQ " + source.getName())
.castDescription("Cast to BeamRow " + source.getName())
.sql(zeroRowSql)
.queryTempProject(source.getQueryTempProject())
.queryTempDataset(source.getQueryTempDataset())

Check warning on line 91 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java#L90-L91

Added lines #L90 - L91 were not covered by tests
.build();
}

Expand All @@ -97,11 +97,13 @@
*
* @return helper object includes metadata and SQL
*/
private SqlQuerySpec getSourceQueryBeamSpec() {
return new SqlQuerySpecBuilder()
private BigQuerySpec getSourceQueryBeamSpec() {
return new BigQuerySpecBuilder()

Check warning on line 101 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java#L101

Added line #L101 was not covered by tests
.castDescription("Cast to BeamRow " + source.getName())
.readDescription("Read from BQ " + source.getName())
.sql(source.getQuery())
.queryTempProject(source.getQueryTempProject())
.queryTempDataset(source.getQueryTempDataset())

Check warning on line 106 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java#L105-L106

Added lines #L105 - L106 were not covered by tests
.build();
}

Expand All @@ -110,20 +112,22 @@
*
* @return helper object includes metadata and SQL
*/
private SqlQuerySpec getTargetQueryBeamSpec(TargetQuerySpec spec) {
private BigQuerySpec getTargetQueryBeamSpec(TargetQuerySpec spec) {
var sourceFields = ModelUtils.getBeamFieldSet(spec.getSourceBeamSchema());
var target = spec.getTarget();
var startNodeTarget = spec.getStartNodeTarget();
var endNodeTarget = spec.getEndNodeTarget();
String sql =
ModelUtils.getTargetSql(
target, startNodeTarget, endNodeTarget, sourceFields, true, source.getQuery());
return new SqlQuerySpecBuilder()
return new BigQuerySpecBuilder()

Check warning on line 123 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java#L123

Added line #L123 was not covered by tests
.readDescription(
targetSequence.getSequenceNumber(target) + ": Read from BQ " + target.getName())
.castDescription(
targetSequence.getSequenceNumber(target) + ": Cast to BeamRow " + target.getName())
.sql(sql)
.queryTempProject(source.getQueryTempProject())
.queryTempDataset(source.getQueryTempDataset())

Check warning on line 130 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java#L129-L130

Added lines #L129 - L130 were not covered by tests
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.teleport.v2.neo4j.providers.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.v2.neo4j.model.helpers.SqlQuerySpec;
import com.google.cloud.teleport.v2.neo4j.model.helpers.BigQuerySpec;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -34,25 +34,33 @@
public class BqQueryToRow extends PTransform<PBegin, PCollection<Row>> {

private static final Logger LOG = LoggerFactory.getLogger(BqQueryToRow.class);
private final SqlQuerySpec bqQuerySpec;
private final BigQuerySpec bqQuerySpec;

public BqQueryToRow(SqlQuerySpec bqQuerySpec) {
public BqQueryToRow(BigQuerySpec bqQuerySpec) {

Check warning on line 39 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java#L39

Added line #L39 was not covered by tests
this.bqQuerySpec = bqQuerySpec;
}

@Override
public PCollection<Row> expand(PBegin input) {

String rewrittenSql = this.bqQuerySpec.getSql();
LOG.info("Reading BQ with query: {}", rewrittenSql);

PCollection<TableRow> sourceRows =
input.apply(
bqQuerySpec.getReadDescription(),
BigQueryIO.readTableRowsWithSchema()
.fromQuery(rewrittenSql)
.usingStandardSql()
.withTemplateCompatibility());
var read =
BigQueryIO.readTableRowsWithSchema()
.fromQuery(rewrittenSql)
.usingStandardSql()
.withTemplateCompatibility();

Check warning on line 52 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java#L49-L52

Added lines #L49 - L52 were not covered by tests

var queryTempProject = this.bqQuerySpec.getQueryTempProject();
var queryTempDataset = this.bqQuerySpec.getQueryTempDataset();

Check warning on line 55 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java#L54-L55

Added lines #L54 - L55 were not covered by tests

if (queryTempProject != null && queryTempDataset != null) {
read = read.withQueryTempProjectAndDataset(queryTempProject, queryTempDataset);

Check warning on line 58 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java#L58

Added line #L58 was not covered by tests
} else if (queryTempDataset != null) {
read = read.withQueryTempDataset(queryTempDataset);

Check warning on line 60 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java#L60

Added line #L60 was not covered by tests
}

PCollection<TableRow> sourceRows = input.apply(bqQuerySpec.getReadDescription(), read);

Check warning on line 63 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BqQueryToRow.java#L63

Added line #L63 was not covered by tests

Schema beamSchema = sourceRows.getSchema();
Coder<Row> rowCoder = SchemaCoder.of(beamSchema);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# keep this sorted
com.google.cloud.teleport.v2.neo4j.model.validation.BigQuerySourceProjectDatasetValidator
com.google.cloud.teleport.v2.neo4j.model.validation.DuplicateAggregateFieldNameValidator
com.google.cloud.teleport.v2.neo4j.model.validation.DuplicateTextHeaderValidator
com.google.cloud.teleport.v2.neo4j.model.validation.InlineSourceDataValidator
Expand Down
Loading
Loading