Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Sgmc/backport 1400 #510

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
Expand Down Expand Up @@ -2200,7 +2201,8 @@ public TableSchema getSchema() {
/** Returns the table reference, or {@code null}. */
@Nullable
public ValueProvider<TableReference> getTable() {
return NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
return jsonTableRef == null ? null :
NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
}

/** Returns {@code true} if table validation is enabled. */
Expand Down Expand Up @@ -2622,6 +2624,11 @@ private static void verifyTablePresence(BigQueryOptions options, TableReference
}
}

@VisibleForTesting
static void clearCreatedTables() {
StreamingWriteFn.clearCreatedTables();
}

/////////////////////////////////////////////////////////////////////////////

/**
Expand Down Expand Up @@ -2654,6 +2661,12 @@ private static class StreamingWriteFn
NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
}

private static void clearCreatedTables() {
synchronized (createdTables) {
createdTables.clear();
}
}

/** Prepares a target BigQuery table. */
@Override
public void startBundle(Context context) {
Expand Down Expand Up @@ -2696,20 +2709,25 @@ public void populateDisplayData(DisplayData.Builder builder) {
}

public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
throws IOException {
throws InterruptedException, IOException {
TableReference tableReference = parseTableSpec(tableSpec);
if (!createdTables.contains(tableSpec)) {
synchronized (createdTables) {
// Another thread may have succeeded in creating the table in the meanwhile, so
// check again. This check isn't needed for correctness, but we add it to prevent
// every thread from attempting a create and overwhelming our BigQuery quota.
DatasetService datasetService = bqServices.getDatasetService(options);
if (!createdTables.contains(tableSpec)) {
TableSchema tableSchema = JSON_FACTORY.fromString(
jsonTableSchema.get(), TableSchema.class);
Bigquery client = Transport.newBigQueryClient(options).build();
BigQueryTableInserter inserter = new BigQueryTableInserter(client);
inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND,
Write.CreateDisposition.CREATE_IF_NEEDED, tableSchema);
Table table = datasetService.getTable(
tableReference.getProjectId(),
tableReference.getDatasetId(),
tableReference.getTableId());
if (table == null) {
TableSchema tableSchema = JSON_FACTORY.fromString(
jsonTableSchema.get(), TableSchema.class);
datasetService.createTable(
new Table().setTableReference(tableReference).setSchema(tableSchema));
}
createdTables.add(tableSpec);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.NoSuchElementException;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -141,6 +142,18 @@ void createDataset(
*/
void deleteDataset(String projectId, String datasetId)
throws IOException, InterruptedException;

void createTable(Table table) throws IOException;

boolean isTableEmpty(String projectId, String datasetId, String tableId)
throws IOException, InterruptedException;

Dataset getDataset(
String projectId, String datasetId) throws IOException, InterruptedException;

long insertAll(
TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
throws IOException, InterruptedException;
}

/**
Expand Down
Loading