Skip to content

Commit 4fa7bd3

Browse files
dhalperidavorbonaci
authored andcommitted
StreamingWriteFn: check if table exists before creating
In StreamingWriteFn, every worker tries to create the table without checking beforehand whether the table exists. This behavior can lead to temporarily violating the BigQuery API quota limits on table.insert: { "code" : 403, "errors" : [ { "domain" : "global", "location" : "table.write", "locationType" : "other", "message" : "Exceeded rate limits: Your table exceeded quota for table.insert or table.update per table. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors";, "reason" : "rateLimitExceeded" } ], "message" : "Exceeded rate limits: Your table exceeded quota for table.insert or table.update per table. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors"; } Note that although this error appears severe, it should not cause jobs to fail. This change primarily aims to reduce the occurrence of this alarming log. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=112818882
1 parent cfda3ff commit 4fa7bd3

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
3232
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
3333
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
34+
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
35+
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
3436
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
3537
import com.google.cloud.dataflow.sdk.options.GcpOptions;
3638
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
@@ -1086,7 +1088,8 @@ public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec
10861088
TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
10871089
Bigquery client = Transport.newBigQueryClient(options).build();
10881090
BigQueryTableInserter inserter = new BigQueryTableInserter(client);
1089-
inserter.tryCreateTable(tableReference, tableSchema);
1091+
inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
1092+
CreateDisposition.CREATE_IF_NEEDED, tableSchema);
10901093
createdTables.add(tableSpec);
10911094
}
10921095
}

0 commit comments

Comments
 (0)