Skip to content

Commit

Permalink
SDC-8731. Google Pub/Sub - "the service was unable to fulfil your req…
Browse files Browse the repository at this point in the history
…uest"

- Refactor of Google Cloud Pub/Sub and BigQuery for latest version.
-- will not pass UT, have not refactored.. putting up as placeholder.

Change-Id: I8975a12d92b544cf53cb9f72b90894c75d789313
Signed-off-by: Keith Burns <keith@streamsets.com>
Reviewed-on: https://review.streamsets.net/14974
Tested-by: StreamSets CI <streamsets-ci-spam@streamsets.com>
Reviewed-by: Jeff Evans <jeff@streamsets.com>
  • Loading branch information
Keith Burns committed Jul 2, 2018
1 parent 2ab5446 commit d3439d9
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 148 deletions.
6 changes: 4 additions & 2 deletions google-cloud-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
<properties>
<gcp-beta.version>0.25.0-beta</gcp-beta.version>
<gcp-gcs.version>1.3.1</gcp-gcs.version>
<gcp-ps.version>1.32.0</gcp-ps.version>
<gcp-bigquery.version>1.33.0</gcp-bigquery.version>
<netty.version>4.1.14.Final</netty.version>
</properties>

Expand All @@ -43,12 +45,12 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>${gcp-beta.version}</version>
<version>${gcp-bigquery.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>${gcp-beta.version}</version>
<version>${gcp-ps.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,28 @@
*/
package com.streamsets.pipeline.stage.bigquery.lib;

import com.google.api.services.bigquery.Bigquery.Jobs.GetQueryResults;
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.QueryRequest;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryResponse;
import com.google.cloud.bigquery.QueryResult;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.streamsets.pipeline.api.Field;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.impl.Utils;
import com.streamsets.pipeline.lib.util.ThreadUtil;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,7 +96,7 @@ public class BigQueryDelegate {
private final Map<FieldValue.Attribute, BiFunction<com.google.cloud.bigquery.Field, FieldValue, Field>> transforms =
ImmutableMap.of(
FieldValue.Attribute.PRIMITIVE, this::fromPrimitiveField,
FieldValue.Attribute.RECORD, (s, v) -> Field.create(fieldsToMap(s.getFields(), v.getRecordValue())),
FieldValue.Attribute.RECORD, (s, v) -> Field.create(fieldsToMap(s.getSubFields(), v.getRecordValue())),
FieldValue.Attribute.REPEATED, (s, v) -> Field.create(fromRepeatedField(s, v.getRepeatedValue()))
);

Expand All @@ -113,9 +120,9 @@ public class BigQueryDelegate {
this.clock = clock;

if (useLegacySql) {
asRecordFieldTypeFunction = field -> LEGACY_SQL_TYPES.get(field.getType().getValue());
asRecordFieldTypeFunction = field -> LEGACY_SQL_TYPES.get(field.getType());
} else {
asRecordFieldTypeFunction = field -> STANDARD_SQL_TYPES.get(field.getType().getValue().getStandardType());
asRecordFieldTypeFunction = field -> STANDARD_SQL_TYPES.get(field.getType().getStandardType());
}
}

Expand All @@ -133,62 +140,94 @@ public BigQueryDelegate(BigQuery bigquery, boolean useLegacySql) {

/**
* Executes a query request and returns the results. A timeout is required to avoid
* waiting for an indeterminate amount of time. If the query fails to complete within the timeout it is aborted.
* waiting for an indeterminate amount of time. If the query fails to complete within the
* timeout it is aborted.
*
* @param queryRequest request describing the query to execute
* @param queryConfig request describing the query to execute
* @param timeout maximum amount of time to wait for job completion before attempting to cancel
* @return result of the query.
* @throws StageException if the query does not complete within the requested timeout or there is an error
* executing the query.
* @throws StageException if the query does not complete within the requested timeout or
* there is an error executing the query.
*/
public QueryResult runQuery(QueryRequest queryRequest, long timeout) throws
public TableResult runQuery(QueryJobConfiguration queryConfig, long timeout, long pageSize) throws
StageException {
checkArgument(timeout >= 1000, "Timeout must be at least one second.");
Instant maxTime = Instant.now().plusMillis(timeout);

QueryResponse response = bigquery.query(queryRequest);
while(!response.jobCompleted()) {
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
JobInfo jobInfo = JobInfo.newBuilder(queryConfig).setJobId(jobId).build();
Job queryJob = bigquery.create(jobInfo);

// Check for errors
if (queryJob == null) {
LOG.error("Job no longer exists: {}", jobInfo);
throw new RuntimeException("Job no longer exists: "+jobInfo);
} else if (queryJob.getStatus().getError() != null) {
BigQueryError error = queryJob.getStatus().getError();
LOG.error("Query Job execution error: {}", error);
throw new StageException(Errors.BIGQUERY_02, error);
}

//Should consider using .waitFor(RetryOption.totalTimeout())
while(!queryJob.isDone()) {
if (Instant.now(clock).isAfter(maxTime) || !ThreadUtil.sleep(100)) {
if (bigquery.cancel(response.getJobId())) {
LOG.info("Job {} cancelled successfully.", response.getJobId());
if (bigquery.cancel(queryJob.getJobId())) {
LOG.info("Job {} cancelled successfully.", queryJob.getJobId());
} else {
LOG.warn("Job {} not found", response.getJobId());
LOG.warn("Job {} not found", queryJob.getJobId());
}
throw new StageException(Errors.BIGQUERY_00);
}
response = bigquery.getQueryResults(response.getJobId());
}

if (response.hasErrors()) {
String errorMsg = response.getExecutionErrors().get(0).toString();

if (queryJob.getStatus().getError() != null) {
String errorMsg = queryJob.getStatus().getError().toString();
throw new StageException(Errors.BIGQUERY_02, errorMsg);
}

// Get the results.
return response.getResult();
TableResult result = null;
try {
result = queryJob.getQueryResults(QueryResultsOption.pageSize(pageSize));
} catch (InterruptedException e) {
String errorMsg = e.getMessage();
throw new StageException(Errors.BIGQUERY_02, errorMsg);
}

return result;
}

/**
* Returns the SDC record {@link Field} type mapped from a BigQuery {@link com.google.cloud.bigquery.Field} type.
* Returns the SDC record {@link Field} type mapped from a
* BigQuery {@link com.google.cloud.bigquery.Field} type.
*
* @param field A BigQuery {@link com.google.cloud.bigquery.Field}
* @return SDC record {@link Field.Type}
*/
public Field.Type asRecordFieldType(com.google.cloud.bigquery.Field field) {
Field.Type type = asRecordFieldTypeFunction.apply(field);
return checkNotNull(type, Utils.format("Unsupported type '{}'", field.getType().getValue()));
return checkNotNull(type, Utils.format("Unsupported type '{}'", field.getType()));
}

/**
* Converts a list of BigQuery fields to SDC Record fields. The provided parameters must have matching lengths.
* If not, an unchecked exception will be thrown. This method is called when the resulting container type
* should be a {@link Field.Type#LIST_MAP}
* Converts a list of BigQuery fields to SDC Record fields.
* The provided parameters must have matching lengths.
*
* If not, an unchecked exception will be thrown. This method is called when the resulting
* container type should be a {@link Field.Type#LIST_MAP}
*
* @param schema List of {@link com.google.cloud.bigquery.Field} representing the schema at the current level of
* nesting. For example, if processing a {@link LegacySQLTypeName#RECORD} or {@link StandardSQLTypeName#STRUCT}
* this would only include the fields for that particular data structure and not the entire result set.
* @param values List of {@link FieldValue} representing the values to set in the generated fields.
* @return Specifically, a LinkedHashMap as the return value of this method is expected to be used to create a
* @param schema List of {@link com.google.cloud.bigquery.Field} representing the schema
* at the current level of nesting. For example, if processing a
* {@link LegacySQLTypeName#RECORD} or {@link StandardSQLTypeName#STRUCT}
* this would only include the fields for that particular data structure and not the entire
* result set.
*
* @param values List of {@link FieldValue} representing the values to set in the generated
* fields.
* @return Specifically, a LinkedHashMap as the return value of this method is expected to be
* used to create a
* {@link Field.Type#LIST_MAP} field.
*/
public LinkedHashMap<String, Field> fieldsToMap( // NOSONAR
Expand All @@ -209,7 +248,10 @@ public LinkedHashMap<String, Field> fieldsToMap( // NOSONAR
if (value.getAttribute().equals(FieldValue.Attribute.PRIMITIVE)) {
root.put(field.getName(), fromPrimitiveField(field, value));
} else if (value.getAttribute().equals(FieldValue.Attribute.RECORD)) {
root.put(field.getName(), Field.create(fieldsToMap(field.getFields(), value.getRecordValue())));
root.put(
field.getName(),
Field.create(fieldsToMap(field.getSubFields(), value.getRecordValue()))
);
} else if (value.getAttribute().equals(FieldValue.Attribute.REPEATED)) {
root.put(field.getName(), Field.create(fromRepeatedField(field, value.getRepeatedValue())));
}
Expand All @@ -218,23 +260,31 @@ public LinkedHashMap<String, Field> fieldsToMap( // NOSONAR
}

/**
* Repeated fields are simply fields that may appear more than once. In SDC we will represent them as a list
* field. For example a repeated field of type RECORD would be a {@link Field.Type#LIST} of
* {@link Field.Type#LIST_MAP} and a repeated field of type STRING would be a {@link Field.Type#LIST} of
* {@link Field.Type#STRING}.
* Repeated fields are simply fields that may appear more than once. In SDC we will
* represent them as a list field. For example a repeated field of type RECORD would be a
* {@link Field.Type#LIST} of
* {@link Field.Type#LIST_MAP} and a repeated field of type STRING would be
* a {@link Field.Type#LIST} of {@link Field.Type#STRING}.
*
* @param schema The field metadata for the repeated field
* @param repeatedValue a list of individual field values that represent the repeated field
* @return a list of SDC record fields. Intended to be used for creating a {@link Field.Type#LIST} {@link Field}
* @return a list of SDC record fields.
* Intended to be used for creating a {@link Field.Type#LIST} {@link Field}
*/
public List<Field> fromRepeatedField(com.google.cloud.bigquery.Field schema, List<FieldValue> repeatedValue) {
public List<Field> fromRepeatedField(com.google.cloud.bigquery.Field schema,
List<FieldValue> repeatedValue) {

if (repeatedValue.isEmpty()) {
return Collections.emptyList();
}
FieldValue.Attribute repeatedFieldType = repeatedValue.get(0).getAttribute();

BiFunction<com.google.cloud.bigquery.Field, FieldValue, Field> transform = transforms.get(repeatedFieldType);
return repeatedValue.stream().map( v -> transform.apply(schema, v)).collect(Collectors.toList());
BiFunction<com.google.cloud.bigquery.Field, FieldValue, Field> transform =
transforms.get(repeatedFieldType);
return repeatedValue
.stream()
.map( v -> transform.apply(schema, v))
.collect(Collectors.toList());
}

public Field fromPrimitiveField(com.google.cloud.bigquery.Field field, FieldValue value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.QueryRequest;
import com.google.cloud.bigquery.QueryResult;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableResult;
import com.google.common.annotations.VisibleForTesting;
import com.streamsets.pipeline.api.BatchMaker;
import com.streamsets.pipeline.api.Field;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class BigQuerySource extends BaseSource {
private final BigQuerySourceConfig conf;

private BigQueryDelegate delegate;
private QueryResult result;
private TableResult result;
private Schema schema;
private int totalCount;

Expand Down Expand Up @@ -100,15 +101,14 @@ BigQuery getBigQuery(Credentials credentials) {
@Override
public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) throws StageException {
String sourceOffset = lastSourceOffset;
long pageSize = (long) Math.min(conf.maxBatchSize, maxBatchSize);

if (result == null) {
QueryRequest queryRequest = QueryRequest.newBuilder(conf.query)
.setPageSize((long) Math.min(conf.maxBatchSize, maxBatchSize))
QueryJobConfiguration queryRequest = QueryJobConfiguration.newBuilder(conf.query)
.setUseQueryCache(conf.useQueryCache)
.setUseLegacySql(conf.useLegacySql)
.build();

result = runQuery(queryRequest);
result = runQuery(queryRequest, pageSize);
schema = result.getSchema();
totalCount = 0;
LOG.debug("Will process a total of {} rows.", result.getTotalRows());
Expand All @@ -117,7 +117,7 @@ public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batc
int count = 0;

// process one page (batch)
for (List<FieldValue> row : result.getValues()) {
for (FieldValueList row : result.iterateAll()) {
sourceOffset = Utils.format("projectId:{}::rowNum:{}", conf.credentials.projectId, count);
Record r = getContext().createRecord(sourceOffset);

Expand Down Expand Up @@ -145,7 +145,7 @@ public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batc
}

@VisibleForTesting
QueryResult runQuery(QueryRequest queryRequest) throws StageException {
return delegate.runQuery(queryRequest, conf.timeout * 1000);
TableResult runQuery(QueryJobConfiguration queryRequest, long pageSize) throws StageException {
return delegate.runQuery(queryRequest, conf.timeout * 1000, pageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.base.Throwables;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.streamsets.pipeline.api.Batch;
Expand Down Expand Up @@ -97,12 +98,12 @@ protected List<ConfigIssue> init() {
generatorFactory = conf.dataFormatConfig.getDataGeneratorFactory();
}

TopicName topic = TopicName.create(conf.credentials.projectId, conf.topicId);
ProjectTopicName topic = ProjectTopicName.of(conf.credentials.projectId, conf.topicId);

conf.credentials.getCredentialsProvider(getContext(), issues).ifPresent(p -> credentialsProvider = p);

try {
publisher = Publisher.defaultBuilder(topic).setCredentialsProvider(credentialsProvider).build();
publisher = Publisher.newBuilder(topic).setCredentialsProvider(credentialsProvider).build();
} catch (IOException e) {
LOG.error(Errors.PUBSUB_07.getMessage(), conf.topicId, e.toString(), e);
issues.add(getContext().createConfigIssue(
Expand Down
Loading

0 comments on commit d3439d9

Please sign in to comment.