Skip to content

Commit

Permalink
Add support for BigQuery resumable uploads via a write channel
Browse files Browse the repository at this point in the history
- Move BlobWriteChannel and BlobReadChannel to core module
- Rename BlobWriteChannel and BlobReadChannel to WriteChannel and ReadChannel
- Add abstract class BaseWriteChannel implementing entity-agnostic channel functionality
- Add BlobWriteChannel and BlobReadChannel implementation to gcloud-java-storage
- Add LoadConfiguration and modify LoadJobInfo to take configuration as a parameter
- Add BigQuery.writer method to return a writer given LoadConfiguration
- Add BigQueryRpc.open and .write methods to implement write channel
- Add TableDataWriteChannel class to support bigquery resumable streaming inserts
- Add unit and integration tests
- Update bigquery example with load-data action
  • Loading branch information
mziccard committed Jan 12, 2016
1 parent b5dd44a commit be1d946
Show file tree
Hide file tree
Showing 34 changed files with 2,114 additions and 1,116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -662,4 +662,12 @@ Page<List<FieldValue>> listTableData(TableId tableId, TableDataListOption... opt
* @throws BigQueryException upon failure
*/
QueryResponse getQueryResults(JobId job, QueryResultsOption... options) throws BigQueryException;

/**
* Returns a channel to write data to be inserted into a BigQuery table. Data format and other
* options can be configured using the {@link LoadConfiguration} parameter.
*
* @throws BigQueryException upon failure
*/
TableDataWriteChannel writer(LoadConfiguration loadConfiguration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,10 @@ private static QueryResult.Builder transformQueryResults(JobId jobId, List<Table
.results(transformTableData(rowsPb));
}

public TableDataWriteChannel writer(LoadConfiguration loadConfiguration) {
return new TableDataWriteChannel(options(), setProjectId(loadConfiguration));
}

private Map<BigQueryRpc.Option, ?> optionMap(Option... options) {
Map<BigQueryRpc.Option, Object> optionMap = Maps.newEnumMap(BigQueryRpc.Option.class);
for (Option option : options) {
Expand Down Expand Up @@ -698,8 +702,7 @@ public TableId apply(TableId tableId) {
if (job instanceof LoadJobInfo) {
LoadJobInfo loadJob = (LoadJobInfo) job;
LoadJobInfo.Builder loadBuilder = loadJob.toBuilder();
loadBuilder.destinationTable(setProjectId(loadJob.destinationTable()));
return loadBuilder.build();
return loadBuilder.configuration(setProjectId(loadJob.configuration())).build();
}
return job;
}
Expand All @@ -711,4 +714,10 @@ private QueryRequest setProjectId(QueryRequest request) {
}
return builder.build();
}

private LoadConfiguration setProjectId(LoadConfiguration configuration) {
LoadConfiguration.Builder builder = configuration.toBuilder();
builder.destinationTable(setProjectId(configuration.destinationTable()));
return builder.build();
}
}
Loading

0 comments on commit be1d946

Please sign in to comment.