Skip to content

Commit fc5fee2

Browse files
authored
Merge pull request GoogleCloudPlatform#540 from dhalperi/bigquery-direct-standard-sql
BigQuery: fix an issue with option propagation and refactor to future-proof
2 parents 9b9ee0b + 9c59d78 commit fc5fee2

File tree

6 files changed

+71
-70
lines changed

6 files changed

+71
-70
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,7 +1075,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
10751075
public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
10761076
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
10771077
return new BigQueryReader(this, bqServices.getReaderFromQuery(
1078-
bqOptions, query.get(), executingProject.get(), flattenResults, useLegacySql));
1078+
bqOptions, executingProject.get(), createBasicQueryConfig()));
10791079
}
10801080

10811081
@Override
@@ -1152,6 +1152,8 @@ private void executeQuery(
11521152
.setProjectId(executingProject)
11531153
.setJobId(jobId);
11541154

1155+
// When changing options here, consider whether to change the defaults from
1156+
// #createBasicQueryConfig instead.
11551157
JobConfigurationQuery queryConfig = createBasicQueryConfig()
11561158
.setAllowLargeResults(true)
11571159
.setCreateDisposition("CREATE_IF_NEEDED")
@@ -1167,9 +1169,11 @@ private void executeQuery(
11671169
}
11681170

11691171
private JobConfigurationQuery createBasicQueryConfig() {
1172+
// Due to deprecated functionality, if this function is updated
1173+
// then the similar code in BigQueryTableRowIterator#fromQuery should be updated.
11701174
return new JobConfigurationQuery()
1171-
.setQuery(query.get())
11721175
.setFlattenResults(flattenResults)
1176+
.setQuery(query.get())
11731177
.setUseLegacySql(useLegacySql);
11741178
}
11751179

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ public interface BigQueryServices extends Serializable {
5858
* Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
5959
*/
6060
BigQueryJsonReader getReaderFromQuery(
61-
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
62-
@Nullable Boolean useLegacySql);
61+
BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig);
6362

6463
/**
6564
* An interface for the Cloud BigQuery load service.

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,11 @@
3939
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
4040
import com.google.cloud.hadoop.util.ApiErrorExtractor;
4141
import com.google.common.annotations.VisibleForTesting;
42-
4342
import org.joda.time.Duration;
4443
import org.slf4j.Logger;
4544
import org.slf4j.LoggerFactory;
46-
4745
import java.io.IOException;
4846
import java.util.NoSuchElementException;
49-
5047
import javax.annotation.Nullable;
5148

5249
/**
@@ -83,9 +80,8 @@ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableRef
8380

8481
@Override
8582
public BigQueryJsonReader getReaderFromQuery(
86-
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
87-
@Nullable Boolean useLegacySql) {
88-
return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten, useLegacySql);
83+
BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
84+
return BigQueryJsonReaderImpl.fromQuery(bqOptions, projectId, queryConfig);
8985
}
9086

9187
@VisibleForTesting
@@ -520,15 +516,10 @@ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
520516
}
521517

522518
private static BigQueryJsonReader fromQuery(
523-
BigQueryOptions bqOptions,
524-
String query,
525-
String projectId,
526-
@Nullable Boolean flattenResults,
527-
@Nullable Boolean useLegacySql) {
519+
BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
528520
return new BigQueryJsonReaderImpl(
529521
BigQueryTableRowIterator.fromQuery(
530-
query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults,
531-
useLegacySql));
522+
queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build()));
532523
}
533524

534525
private static BigQueryJsonReader fromTable(

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,9 @@
4646
import com.google.common.base.MoreObjects;
4747
import com.google.common.collect.ImmutableList;
4848
import com.google.common.util.concurrent.Uninterruptibles;
49-
5049
import org.joda.time.Duration;
5150
import org.slf4j.Logger;
5251
import org.slf4j.LoggerFactory;
53-
5452
import java.io.IOException;
5553
import java.util.Collection;
5654
import java.util.Collections;
@@ -61,7 +59,6 @@
6159
import java.util.Objects;
6260
import java.util.Random;
6361
import java.util.concurrent.TimeUnit;
64-
6562
import javax.annotation.Nullable;
6663

6764
/**
@@ -73,6 +70,7 @@ public class BigQueryTableRowIterator implements AutoCloseable {
7370
@Nullable private TableReference ref;
7471
@Nullable private final String projectId;
7572
@Nullable private TableSchema schema;
73+
@Nullable private final JobConfigurationQuery queryConfig;
7674
private final Bigquery client;
7775
private String pageToken;
7876
private Iterator<TableRow> iteratorOverCurrentBatch;
@@ -89,25 +87,18 @@ public class BigQueryTableRowIterator implements AutoCloseable {
8987
// following interval to check the status of query execution job
9088
private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);
9189

92-
private final String query;
93-
// Whether to flatten query results.
94-
private final boolean flattenResults;
95-
// Whether to use the BigQuery legacy SQL dialect..
96-
private final boolean useLegacySql;
9790
// Temporary dataset used to store query results.
9891
private String temporaryDatasetId = null;
9992
// Temporary table used to store query results.
10093
private String temporaryTableId = null;
10194

10295
private BigQueryTableRowIterator(
103-
@Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
104-
Bigquery client, boolean flattenResults, boolean useLegacySql) {
96+
@Nullable TableReference ref, @Nullable JobConfigurationQuery queryConfig,
97+
@Nullable String projectId, Bigquery client) {
10598
this.ref = ref;
106-
this.query = query;
99+
this.queryConfig = queryConfig;
107100
this.projectId = projectId;
108101
this.client = checkNotNull(client, "client");
109-
this.flattenResults = flattenResults;
110-
this.useLegacySql = useLegacySql;
111102
}
112103

113104
/**
@@ -116,7 +107,7 @@ private BigQueryTableRowIterator(
116107
public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
117108
checkNotNull(ref, "ref");
118109
checkNotNull(client, "client");
119-
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true);
110+
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client);
120111
}
121112

122113
/**
@@ -135,23 +126,39 @@ public static BigQueryTableRowIterator fromQuery(
135126
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
136127
* specified query in the specified project.
137128
*/
129+
@Deprecated
138130
public static BigQueryTableRowIterator fromQuery(
139131
String query, String projectId, Bigquery client, @Nullable Boolean flattenResults,
140132
@Nullable Boolean useLegacySql) {
141133
checkNotNull(query, "query");
142134
checkNotNull(projectId, "projectId");
143135
checkNotNull(client, "client");
144-
return new BigQueryTableRowIterator(null, query, projectId, client,
145-
MoreObjects.firstNonNull(flattenResults, Boolean.TRUE),
146-
MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE));
136+
JobConfigurationQuery queryConfig = new JobConfigurationQuery()
137+
.setFlattenResults(MoreObjects.firstNonNull(flattenResults, Boolean.TRUE))
138+
.setPriority("BATCH")
139+
.setQuery(query)
140+
.setUseLegacySql(MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE));
141+
return new BigQueryTableRowIterator(null, queryConfig, projectId, client);
142+
}
143+
144+
/**
145+
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
146+
* specified query in the specified project.
147+
*/
148+
public static BigQueryTableRowIterator fromQuery(
149+
JobConfigurationQuery queryConfig, String projectId, Bigquery client) {
150+
checkNotNull(queryConfig, "queryConfig");
151+
checkNotNull(projectId, "projectId");
152+
checkNotNull(client, "client");
153+
return new BigQueryTableRowIterator(null, queryConfig, projectId, client);
147154
}
148155

149156
/**
150157
* Opens the table for read.
151158
* @throws IOException on failure
152159
*/
153160
public void open() throws IOException, InterruptedException {
154-
if (query != null) {
161+
if (queryConfig != null) {
155162
ref = executeQueryAndWaitForCompletion();
156163
}
157164
// Get table schema.
@@ -401,15 +408,17 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce
401408
*/
402409
private TableReference executeQueryAndWaitForCompletion()
403410
throws IOException, InterruptedException {
411+
checkState(projectId != null, "Unable to execute a query without a configured project id");
412+
checkState(queryConfig != null, "Unable to execute a query without a configured query");
404413
// Dry run query to get source table location
405414
Job dryRunJob = new Job()
406415
.setConfiguration(new JobConfiguration()
407-
.setQuery(new JobConfigurationQuery()
408-
.setQuery(query))
416+
.setQuery(queryConfig)
409417
.setDryRun(true));
410418
JobStatistics jobStats = executeWithBackOff(
411419
client.jobs().insert(projectId, dryRunJob),
412-
String.format("Error when trying to dry run query %s.", query)).getStatistics();
420+
String.format("Error when trying to dry run query %s.",
421+
queryConfig.toPrettyString())).getStatistics();
413422

414423
// Let BigQuery to pick default location if the query does not read any tables.
415424
String location = null;
@@ -428,30 +437,27 @@ private TableReference executeQueryAndWaitForCompletion()
428437
createDataset(temporaryDatasetId, location);
429438
Job job = new Job();
430439
JobConfiguration config = new JobConfiguration();
431-
JobConfigurationQuery queryConfig = new JobConfigurationQuery();
432440
config.setQuery(queryConfig);
433441
job.setConfiguration(config);
434-
queryConfig.setQuery(query);
435-
queryConfig.setAllowLargeResults(true);
436-
queryConfig.setFlattenResults(flattenResults);
437-
queryConfig.setUseLegacySql(useLegacySql);
438-
439442

440443
TableReference destinationTable = new TableReference();
441444
destinationTable.setProjectId(projectId);
442445
destinationTable.setDatasetId(temporaryDatasetId);
443446
destinationTable.setTableId(temporaryTableId);
444447
queryConfig.setDestinationTable(destinationTable);
448+
queryConfig.setAllowLargeResults(Boolean.TRUE);
445449

446450
Job queryJob = executeWithBackOff(
447451
client.jobs().insert(projectId, job),
448-
String.format("Error when trying to execute the job for query %s.", query));
452+
String.format("Error when trying to execute the job for query %s.",
453+
queryConfig.toPrettyString()));
449454
JobReference jobId = queryJob.getJobReference();
450455

451456
while (true) {
452457
Job pollJob = executeWithBackOff(
453458
client.jobs().get(projectId, jobId.getJobId()),
454-
String.format("Error when trying to get status of the job for query %s.", query));
459+
String.format("Error when trying to get status of the job for query %s.",
460+
queryConfig.toPrettyString()));
455461
JobStatus status = pollJob.getStatus();
456462
if (status.getState().equals("DONE")) {
457463
// Job is DONE, but did not necessarily succeed.
@@ -461,7 +467,9 @@ private TableReference executeQueryAndWaitForCompletion()
461467
} else {
462468
// There will be no temporary table to delete, so null out the reference.
463469
temporaryTableId = null;
464-
throw new IOException("Executing query " + query + " failed: " + error.getMessage());
470+
throw new IOException(
471+
String.format("Executing query %s failed: %s",
472+
queryConfig.toPrettyString(), error.getMessage()));
465473
}
466474
}
467475
Uninterruptibles.sleepUninterruptibly(

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@
106106
import com.google.common.collect.ImmutableList;
107107
import com.google.common.collect.ImmutableMap;
108108
import com.google.common.collect.Lists;
109-
110109
import org.hamcrest.CoreMatchers;
111110
import org.hamcrest.Matchers;
112111
import org.junit.Assert;
@@ -122,7 +121,6 @@
122121
import org.mockito.Mock;
123122
import org.mockito.Mockito;
124123
import org.mockito.MockitoAnnotations;
125-
126124
import java.io.File;
127125
import java.io.FileFilter;
128126
import java.io.IOException;
@@ -135,8 +133,6 @@
135133
import java.util.NoSuchElementException;
136134
import java.util.Set;
137135

138-
import javax.annotation.Nullable;
139-
140136
/**
141137
* Tests for BigQueryIO.
142138
*/
@@ -187,8 +183,7 @@ public BigQueryJsonReader getReaderFromTable(
187183

188184
@Override
189185
public BigQueryJsonReader getReaderFromQuery(
190-
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
191-
@Nullable Boolean useLegacySql) {
186+
BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
192187
return new FakeBigQueryReader(jsonTableRowReturns);
193188
}
194189

@@ -1749,3 +1744,4 @@ public boolean accept(File pathname) {
17491744
}}).length);
17501745
}
17511746
}
1747+

sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -258,14 +258,18 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException
258258

259259
// Mock job polling.
260260
JobStatus status = new JobStatus().setState("DONE");
261-
TableReference tableRef =
262-
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
263-
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef);
261+
JobConfigurationQuery resultQueryConfig =
262+
new JobConfigurationQuery().setDestinationTable(
263+
new TableReference()
264+
.setProjectId("project")
265+
.setDatasetId("tempdataset")
266+
.setTableId("temptable")
267+
);
264268
Job getJob =
265269
new Job()
266270
.setJobReference(new JobReference())
267271
.setStatus(status)
268-
.setConfiguration(new JobConfiguration().setQuery(queryConfig));
272+
.setConfiguration(new JobConfiguration().setQuery(resultQueryConfig));
269273
when(mockJobsGet.execute()).thenReturn(getJob);
270274

271275
// Mock table schema fetch.
@@ -281,8 +285,9 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException
281285
String query = String.format(
282286
"SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo",
283287
photoBytesEncoded);
288+
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
284289
try (BigQueryTableRowIterator iterator =
285-
BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) {
290+
BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) {
286291
iterator.open();
287292
assertTrue(iterator.advance());
288293
TableRow row = iterator.getCurrent();
@@ -317,7 +322,7 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException
317322
verify(mockTablesDelete).execute();
318323
// Table data read.
319324
verify(mockClient).tabledata();
320-
verify(mockTabledata).list("project", "dataset", "table");
325+
verify(mockTabledata).list("project", "tempdataset", "temptable");
321326
verify(mockTabledataList).execute();
322327
}
323328

@@ -334,18 +339,16 @@ public void testQueryFailed() throws IOException {
334339
when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, exception);
335340

336341
String query = "NOT A QUERY";
342+
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
337343
try (BigQueryTableRowIterator iterator =
338-
BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) {
339-
340-
try {
341-
iterator.open();
342-
fail();
343-
} catch (Exception expected) {
344-
// Verify message explains cause and reports the query.
345-
assertThat(expected.getMessage(), containsString("Error"));
346-
assertThat(expected.getMessage(), containsString(query));
347-
assertThat(expected.getCause().getMessage(), containsString(errorReason));
348-
}
344+
BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) {
345+
iterator.open();
346+
fail();
347+
} catch (Exception expected) {
348+
// Verify message explains cause and reports the query.
349+
assertThat(expected.getMessage(), containsString("Error"));
350+
assertThat(expected.getMessage(), containsString(query));
351+
assertThat(expected.getCause().getMessage(), containsString(errorReason));
349352
}
350353

351354
// Job inserted to run the query, then polled once.

0 commit comments

Comments
 (0)