Skip to content

Commit dbe4644

Browse files
committed
BigQuery: fix an issue with option propagation and refactor to future-proof
* We created a helper in BigQueryIO to create a JobConfigurationQuery capturing all options, but we had not yet propagated this cleanup into the Services abstraction or helper classes. Refactor BigQueryServices and BigQueryTableRowIterator to propagate the same configuration. Adds a new deprecated constructor to BigQueryTableRowIterator for backwards-compatibility. This fixes GoogleCloudPlatform#539.
1 parent efd33cc commit dbe4644

File tree

5 files changed

+52
-53
lines changed

5 files changed

+52
-53
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,7 +1075,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
10751075
public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
10761076
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
10771077
return new BigQueryReader(this, bqServices.getReaderFromQuery(
1078-
bqOptions, query.get(), executingProject.get(), flattenResults, useLegacySql));
1078+
bqOptions, createBasicQueryConfig(), executingProject.get()));
10791079
}
10801080

10811081
@Override
@@ -1152,11 +1152,12 @@ private void executeQuery(
11521152
.setProjectId(executingProject)
11531153
.setJobId(jobId);
11541154

1155+
// When changing options here, consider whether to change the defaults from
1156+
// #createBasicQueryConfig instead.
11551157
JobConfigurationQuery queryConfig = createBasicQueryConfig()
11561158
.setAllowLargeResults(true)
11571159
.setCreateDisposition("CREATE_IF_NEEDED")
11581160
.setDestinationTable(destinationTable)
1159-
.setPriority("BATCH")
11601161
.setWriteDisposition("WRITE_EMPTY");
11611162

11621163
jobService.startQueryJob(jobRef, queryConfig);
@@ -1167,9 +1168,12 @@ private void executeQuery(
11671168
}
11681169

11691170
private JobConfigurationQuery createBasicQueryConfig() {
1171+
// Due to deprecated functionality, if this function is updated
1172+
// then the similar code in BigQueryTableRowIterator#fromQuery should be updated.
11701173
return new JobConfigurationQuery()
1171-
.setQuery(query.get())
11721174
.setFlattenResults(flattenResults)
1175+
.setPriority("BATCH")
1176+
.setQuery(query.get())
11731177
.setUseLegacySql(useLegacySql);
11741178
}
11751179

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ public interface BigQueryServices extends Serializable {
5858
* Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
5959
*/
6060
BigQueryJsonReader getReaderFromQuery(
61-
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
62-
@Nullable Boolean useLegacySql);
61+
BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId);
6362

6463
/**
6564
* An interface for the Cloud BigQuery load service.

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,11 @@
3939
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
4040
import com.google.cloud.hadoop.util.ApiErrorExtractor;
4141
import com.google.common.annotations.VisibleForTesting;
42-
4342
import org.joda.time.Duration;
4443
import org.slf4j.Logger;
4544
import org.slf4j.LoggerFactory;
46-
4745
import java.io.IOException;
4846
import java.util.NoSuchElementException;
49-
5047
import javax.annotation.Nullable;
5148

5249
/**
@@ -83,9 +80,8 @@ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableRef
8380

8481
@Override
8582
public BigQueryJsonReader getReaderFromQuery(
86-
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
87-
@Nullable Boolean useLegacySql) {
88-
return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten, useLegacySql);
83+
BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) {
84+
return BigQueryJsonReaderImpl.fromQuery(bqOptions, queryConfig, projectId);
8985
}
9086

9187
@VisibleForTesting
@@ -521,14 +517,11 @@ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
521517

522518
private static BigQueryJsonReader fromQuery(
523519
BigQueryOptions bqOptions,
524-
String query,
525-
String projectId,
526-
@Nullable Boolean flattenResults,
527-
@Nullable Boolean useLegacySql) {
520+
JobConfigurationQuery queryConfig,
521+
String projectId) {
528522
return new BigQueryJsonReaderImpl(
529523
BigQueryTableRowIterator.fromQuery(
530-
query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults,
531-
useLegacySql));
524+
queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build()));
532525
}
533526

534527
private static BigQueryJsonReader fromTable(

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,9 @@
4646
import com.google.common.base.MoreObjects;
4747
import com.google.common.collect.ImmutableList;
4848
import com.google.common.util.concurrent.Uninterruptibles;
49-
5049
import org.joda.time.Duration;
5150
import org.slf4j.Logger;
5251
import org.slf4j.LoggerFactory;
53-
5452
import java.io.IOException;
5553
import java.util.Collection;
5654
import java.util.Collections;
@@ -61,7 +59,6 @@
6159
import java.util.Objects;
6260
import java.util.Random;
6361
import java.util.concurrent.TimeUnit;
64-
6562
import javax.annotation.Nullable;
6663

6764
/**
@@ -73,6 +70,7 @@ public class BigQueryTableRowIterator implements AutoCloseable {
7370
@Nullable private TableReference ref;
7471
@Nullable private final String projectId;
7572
@Nullable private TableSchema schema;
73+
@Nullable private final JobConfigurationQuery queryConfig;
7674
private final Bigquery client;
7775
private String pageToken;
7876
private Iterator<TableRow> iteratorOverCurrentBatch;
@@ -89,25 +87,18 @@ public class BigQueryTableRowIterator implements AutoCloseable {
8987
// following interval to check the status of query execution job
9088
private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);
9189

92-
private final String query;
93-
// Whether to flatten query results.
94-
private final boolean flattenResults;
95-
// Whether to use the BigQuery legacy SQL dialect..
96-
private final boolean useLegacySql;
9790
// Temporary dataset used to store query results.
9891
private String temporaryDatasetId = null;
9992
// Temporary table used to store query results.
10093
private String temporaryTableId = null;
10194

10295
private BigQueryTableRowIterator(
103-
@Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
104-
Bigquery client, boolean flattenResults, boolean useLegacySql) {
96+
@Nullable TableReference ref, @Nullable JobConfigurationQuery queryConfig,
97+
@Nullable String projectId, Bigquery client) {
10598
this.ref = ref;
106-
this.query = query;
99+
this.queryConfig = queryConfig;
107100
this.projectId = projectId;
108101
this.client = checkNotNull(client, "client");
109-
this.flattenResults = flattenResults;
110-
this.useLegacySql = useLegacySql;
111102
}
112103

113104
/**
@@ -116,7 +107,7 @@ private BigQueryTableRowIterator(
116107
public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
117108
checkNotNull(ref, "ref");
118109
checkNotNull(client, "client");
119-
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true);
110+
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client);
120111
}
121112

122113
/**
@@ -135,23 +126,39 @@ public static BigQueryTableRowIterator fromQuery(
135126
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
136127
* specified query in the specified project.
137128
*/
129+
@Deprecated
138130
public static BigQueryTableRowIterator fromQuery(
139131
String query, String projectId, Bigquery client, @Nullable Boolean flattenResults,
140132
@Nullable Boolean useLegacySql) {
141133
checkNotNull(query, "query");
142134
checkNotNull(projectId, "projectId");
143135
checkNotNull(client, "client");
144-
return new BigQueryTableRowIterator(null, query, projectId, client,
145-
MoreObjects.firstNonNull(flattenResults, Boolean.TRUE),
146-
MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE));
136+
JobConfigurationQuery queryConfig = new JobConfigurationQuery()
137+
.setFlattenResults(MoreObjects.firstNonNull(flattenResults, Boolean.TRUE))
138+
.setPriority("BATCH")
139+
.setQuery(query)
140+
.setUseLegacySql(MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE));
141+
return new BigQueryTableRowIterator(null, queryConfig, projectId, client);
142+
}
143+
144+
/**
145+
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
146+
* specified query in the specified project.
147+
*/
148+
public static BigQueryTableRowIterator fromQuery(
149+
JobConfigurationQuery queryConfig, String projectId, Bigquery client) {
150+
checkNotNull(queryConfig, "queryConfig");
151+
checkNotNull(projectId, "projectId");
152+
checkNotNull(client, "client");
153+
return new BigQueryTableRowIterator(null, queryConfig, projectId, client);
147154
}
148155

149156
/**
150157
* Opens the table for read.
151158
* @throws IOException on failure
152159
*/
153160
public void open() throws IOException, InterruptedException {
154-
if (query != null) {
161+
if (queryConfig != null) {
155162
ref = executeQueryAndWaitForCompletion();
156163
}
157164
// Get table schema.
@@ -401,15 +408,17 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce
401408
*/
402409
private TableReference executeQueryAndWaitForCompletion()
403410
throws IOException, InterruptedException {
411+
checkState(projectId != null, "Cannot dryRun a query in unknown (null) project");
412+
checkState(queryConfig != null, "Cannot dryRun a null query");
404413
// Dry run query to get source table location
405414
Job dryRunJob = new Job()
406415
.setConfiguration(new JobConfiguration()
407-
.setQuery(new JobConfigurationQuery()
408-
.setQuery(query))
416+
.setQuery(queryConfig)
409417
.setDryRun(true));
410418
JobStatistics jobStats = executeWithBackOff(
411419
client.jobs().insert(projectId, dryRunJob),
412-
String.format("Error when trying to dry run query %s.", query)).getStatistics();
420+
String.format("Error when trying to dry run query %s.",
421+
queryConfig.toPrettyString())).getStatistics();
413422

414423
// Let BigQuery to pick default location if the query does not read any tables.
415424
String location = null;
@@ -428,14 +437,8 @@ private TableReference executeQueryAndWaitForCompletion()
428437
createDataset(temporaryDatasetId, location);
429438
Job job = new Job();
430439
JobConfiguration config = new JobConfiguration();
431-
JobConfigurationQuery queryConfig = new JobConfigurationQuery();
432440
config.setQuery(queryConfig);
433441
job.setConfiguration(config);
434-
queryConfig.setQuery(query);
435-
queryConfig.setAllowLargeResults(true);
436-
queryConfig.setFlattenResults(flattenResults);
437-
queryConfig.setUseLegacySql(useLegacySql);
438-
439442

440443
TableReference destinationTable = new TableReference();
441444
destinationTable.setProjectId(projectId);
@@ -445,13 +448,15 @@ private TableReference executeQueryAndWaitForCompletion()
445448

446449
Job queryJob = executeWithBackOff(
447450
client.jobs().insert(projectId, job),
448-
String.format("Error when trying to execute the job for query %s.", query));
451+
String.format("Error when trying to execute the job for query %s.",
452+
queryConfig.toPrettyString()));
449453
JobReference jobId = queryJob.getJobReference();
450454

451455
while (true) {
452456
Job pollJob = executeWithBackOff(
453457
client.jobs().get(projectId, jobId.getJobId()),
454-
String.format("Error when trying to get status of the job for query %s.", query));
458+
String.format("Error when trying to get status of the job for query %s.",
459+
queryConfig.toPrettyString()));
455460
JobStatus status = pollJob.getStatus();
456461
if (status.getState().equals("DONE")) {
457462
// Job is DONE, but did not necessarily succeed.
@@ -461,7 +466,9 @@ private TableReference executeQueryAndWaitForCompletion()
461466
} else {
462467
// There will be no temporary table to delete, so null out the reference.
463468
temporaryTableId = null;
464-
throw new IOException("Executing query " + query + " failed: " + error.getMessage());
469+
throw new IOException(
470+
String.format("Executing query %s failed: %s",
471+
queryConfig.toPrettyString(), error.getMessage()));
465472
}
466473
}
467474
Uninterruptibles.sleepUninterruptibly(

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@
106106
import com.google.common.collect.ImmutableList;
107107
import com.google.common.collect.ImmutableMap;
108108
import com.google.common.collect.Lists;
109-
110109
import org.hamcrest.CoreMatchers;
111110
import org.hamcrest.Matchers;
112111
import org.junit.Assert;
@@ -122,7 +121,6 @@
122121
import org.mockito.Mock;
123122
import org.mockito.Mockito;
124123
import org.mockito.MockitoAnnotations;
125-
126124
import java.io.File;
127125
import java.io.FileFilter;
128126
import java.io.IOException;
@@ -135,8 +133,6 @@
135133
import java.util.NoSuchElementException;
136134
import java.util.Set;
137135

138-
import javax.annotation.Nullable;
139-
140136
/**
141137
* Tests for BigQueryIO.
142138
*/
@@ -187,8 +183,7 @@ public BigQueryJsonReader getReaderFromTable(
187183

188184
@Override
189185
public BigQueryJsonReader getReaderFromQuery(
190-
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
191-
@Nullable Boolean useLegacySql) {
186+
BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) {
192187
return new FakeBigQueryReader(jsonTableRowReturns);
193188
}
194189

@@ -1749,3 +1744,4 @@ public boolean accept(File pathname) {
17491744
}}).length);
17501745
}
17511746
}
1747+

0 commit comments

Comments
 (0)