Skip to content

Commit 50f98a7

Browse files
dhalperidavorbonaci
authored andcommitted
BigQueryTableInserter: retry rateLimitExceeded API calls
When using BigQueryIO with a custom table per window, we may temporarily exceed BigQuery's quota. In these cases, retry for a short period of time before failing. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=113010175
1 parent e468cad commit 50f98a7

File tree

2 files changed

+294
-12
lines changed

2 files changed

+294
-12
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616

1717
package com.google.cloud.dataflow.sdk.util;
1818

19+
import com.google.api.client.util.BackOff;
20+
import com.google.api.client.util.BackOffUtils;
21+
import com.google.api.client.util.ExponentialBackOff;
22+
import com.google.api.client.util.Sleeper;
1923
import com.google.api.services.bigquery.Bigquery;
2024
import com.google.api.services.bigquery.model.Table;
2125
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
@@ -28,6 +32,7 @@
2832
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
2933
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
3034
import com.google.cloud.hadoop.util.ApiErrorExtractor;
35+
import com.google.common.annotations.VisibleForTesting;
3136
import com.google.common.base.Preconditions;
3237
import com.google.common.base.Throwables;
3338
import com.google.common.util.concurrent.MoreExecutors;
@@ -350,6 +355,13 @@ public boolean isEmpty(TableReference ref) throws IOException {
350355
return dataList.getRows() == null || dataList.getRows().isEmpty();
351356
}
352357

358+
/**
359+
* Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
360+
* quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
361+
* configured with a table spec function to use different tables for each window.
362+
*/
363+
private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int) TimeUnit.MINUTES.toMillis(5);
364+
353365
/**
354366
* Tries to create the BigQuery table.
355367
* If a table with the same name already exists in the dataset, the table
@@ -365,21 +377,52 @@ public boolean isEmpty(TableReference ref) throws IOException {
365377
@Nullable
366378
public Table tryCreateTable(TableReference ref, TableSchema schema) throws IOException {
367379
LOG.info("Trying to create BigQuery table: {}", BigQueryIO.toTableSpec(ref));
380+
BackOff backoff =
381+
new ExponentialBackOff.Builder()
382+
.setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
383+
.build();
368384

369-
Table content = new Table();
370-
content.setTableReference(ref);
371-
content.setSchema(schema);
385+
Table table = new Table().setTableReference(ref).setSchema(schema);
386+
return tryCreateTable(table, ref.getProjectId(), ref.getDatasetId(), backoff, Sleeper.DEFAULT);
387+
}
372388

373-
try {
374-
return client.tables()
375-
.insert(ref.getProjectId(), ref.getDatasetId(), content)
376-
.execute();
377-
} catch (IOException e) {
378-
if (new ApiErrorExtractor().itemAlreadyExists(e)) {
379-
LOG.info("The BigQuery table already exists.");
380-
return null;
389+
@VisibleForTesting
390+
@Nullable
391+
Table tryCreateTable(
392+
Table table, String projectId, String datasetId, BackOff backoff, Sleeper sleeper)
393+
throws IOException {
394+
boolean retry = false;
395+
while (true) {
396+
try {
397+
return client.tables().insert(projectId, datasetId, table).execute();
398+
} catch (IOException e) {
399+
ApiErrorExtractor extractor = new ApiErrorExtractor();
400+
if (extractor.itemAlreadyExists(e)) {
401+
// The table already exists, nothing to return.
402+
return null;
403+
} else if (extractor.rateLimited(e)) {
404+
// The request failed because we hit a temporary quota. Back off and try again.
405+
try {
406+
if (BackOffUtils.next(sleeper, backoff)) {
407+
if (!retry) {
408+
LOG.info(
409+
"Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
410+
projectId,
411+
datasetId,
412+
table.getTableReference().getTableId(),
413+
TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
414+
retry = true;
415+
}
416+
continue;
417+
}
418+
} catch (InterruptedException e1) {
419+
// Restore interrupted state and throw the last failure.
420+
Thread.currentThread().interrupt();
421+
throw e;
422+
}
423+
}
424+
throw e;
381425
}
382-
throw e;
383426
}
384427
}
385428
}
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/*
2+
* Copyright (C) 2015 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package com.google.cloud.dataflow.sdk.util;
18+
19+
import static com.google.common.base.Verify.verifyNotNull;
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertNull;
22+
import static org.junit.Assert.fail;
23+
import static org.mockito.Mockito.atLeastOnce;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
26+
import static org.mockito.Mockito.verifyNoMoreInteractions;
27+
import static org.mockito.Mockito.when;
28+
29+
import com.google.api.client.googleapis.json.GoogleJsonError;
30+
import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
31+
import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
32+
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
33+
import com.google.api.client.http.LowLevelHttpResponse;
34+
import com.google.api.client.json.GenericJson;
35+
import com.google.api.client.json.Json;
36+
import com.google.api.client.json.jackson2.JacksonFactory;
37+
import com.google.api.client.testing.http.MockHttpTransport;
38+
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
39+
import com.google.api.client.util.BackOff;
40+
import com.google.api.client.util.Sleeper;
41+
import com.google.api.services.bigquery.Bigquery;
42+
import com.google.api.services.bigquery.model.Table;
43+
import com.google.api.services.bigquery.model.TableReference;
44+
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
45+
import com.google.cloud.hadoop.util.RetryBoundedBackOff;
46+
import com.google.common.collect.ImmutableList;
47+
48+
import org.junit.After;
49+
import org.junit.Before;
50+
import org.junit.Rule;
51+
import org.junit.Test;
52+
import org.junit.rules.ExpectedException;
53+
import org.junit.runner.RunWith;
54+
import org.junit.runners.JUnit4;
55+
import org.mockito.Mock;
56+
import org.mockito.MockitoAnnotations;
57+
58+
import java.io.ByteArrayInputStream;
59+
import java.io.IOException;
60+
import java.io.InputStream;
61+
62+
/**
63+
* Tests of {@link BigQueryTableInserter}.
64+
*/
65+
@RunWith(JUnit4.class)
66+
public class BigQueryTableInserterTest {
67+
@Rule public ExpectedException thrown = ExpectedException.none();
68+
@Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryTableInserter.class);
69+
@Mock private LowLevelHttpResponse response;
70+
private Bigquery bigquery;
71+
72+
@Before
73+
public void setUp() {
74+
MockitoAnnotations.initMocks(this);
75+
76+
// A mock transport that lets us mock the API responses.
77+
MockHttpTransport transport =
78+
new MockHttpTransport.Builder()
79+
.setLowLevelHttpRequest(
80+
new MockLowLevelHttpRequest() {
81+
@Override
82+
public LowLevelHttpResponse execute() throws IOException {
83+
return response;
84+
}
85+
})
86+
.build();
87+
88+
// A sample BigQuery API client that uses default JsonFactory and RetryHttpInitializer.
89+
bigquery =
90+
new Bigquery.Builder(
91+
transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())
92+
.build();
93+
}
94+
95+
@After
96+
public void tearDown() throws IOException {
97+
// These three interactions happen for every request in the normal response parsing.
98+
verify(response, atLeastOnce()).getContentEncoding();
99+
verify(response, atLeastOnce()).getHeaderCount();
100+
verify(response, atLeastOnce()).getReasonPhrase();
101+
verifyNoMoreInteractions(response);
102+
}
103+
104+
/** A helper to wrap a {@link GenericJson} object in a content stream. */
105+
private static InputStream toStream(GenericJson content) throws IOException {
106+
return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));
107+
}
108+
109+
/** A helper that generates the error JSON payload that Google APIs produce. */
110+
private static GoogleJsonErrorContainer errorWithReasonAndStatus(String reason, int status) {
111+
ErrorInfo info = new ErrorInfo();
112+
info.setReason(reason);
113+
info.setDomain("global");
114+
// GoogleJsonError contains one or more ErrorInfo objects; our utiities read the first one.
115+
GoogleJsonError error = new GoogleJsonError();
116+
error.setErrors(ImmutableList.of(info));
117+
error.setCode(status);
118+
// The actual JSON response is an error container.
119+
GoogleJsonErrorContainer container = new GoogleJsonErrorContainer();
120+
container.setError(error);
121+
return container;
122+
}
123+
124+
/**
125+
* Tests that {@link BigQueryTableInserter} succeeds on the first try.
126+
*/
127+
@Test
128+
public void testCreateTableSucceeds() throws IOException {
129+
Table testTable = new Table().setDescription("a table");
130+
131+
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
132+
when(response.getStatusCode()).thenReturn(200);
133+
when(response.getContent()).thenReturn(toStream(testTable));
134+
135+
BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
136+
Table ret =
137+
inserter.tryCreateTable(
138+
new Table(),
139+
"project",
140+
"dataset",
141+
new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
142+
Sleeper.DEFAULT);
143+
assertEquals(testTable, ret);
144+
verify(response, times(1)).getStatusCode();
145+
verify(response, times(1)).getContent();
146+
verify(response, times(1)).getContentType();
147+
}
148+
149+
/**
150+
* Tests that {@link BigQueryTableInserter} succeeds when the table already exists.
151+
*/
152+
@Test
153+
public void testCreateTableSucceedsAlreadyExists() throws IOException {
154+
when(response.getStatusCode()).thenReturn(409); // 409 means already exists
155+
156+
BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
157+
Table ret =
158+
inserter.tryCreateTable(
159+
new Table(),
160+
"project",
161+
"dataset",
162+
new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
163+
Sleeper.DEFAULT);
164+
165+
assertNull(ret);
166+
verify(response, times(1)).getStatusCode();
167+
verify(response, times(1)).getContent();
168+
verify(response, times(1)).getContentType();
169+
}
170+
171+
/**
172+
* Tests that {@link BigQueryTableInserter} retries quota rate limited attempts.
173+
*/
174+
@Test
175+
public void testCreateTableRetry() throws IOException {
176+
TableReference ref =
177+
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
178+
Table testTable = new Table().setTableReference(ref);
179+
180+
// First response is 403 rate limited, second response has valid payload.
181+
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
182+
when(response.getStatusCode()).thenReturn(403).thenReturn(200);
183+
when(response.getContent())
184+
.thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
185+
.thenReturn(toStream(testTable));
186+
187+
BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
188+
Table ret =
189+
inserter.tryCreateTable(
190+
testTable,
191+
"project",
192+
"dataset",
193+
new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
194+
Sleeper.DEFAULT);
195+
assertEquals(testTable, ret);
196+
verify(response, times(2)).getStatusCode();
197+
verify(response, times(2)).getContent();
198+
verify(response, times(2)).getContentType();
199+
verifyNotNull(ret.getTableReference());
200+
expectedLogs.verifyInfo(
201+
"Quota limit reached when creating table project:dataset.table, "
202+
+ "retrying up to 5.0 minutes");
203+
}
204+
205+
/**
206+
* Tests that {@link BigQueryTableInserter} does not retry non-rate-limited attempts.
207+
*/
208+
@Test
209+
public void testCreateTableDoesNotRetry() throws IOException {
210+
Table testTable = new Table().setDescription("a table");
211+
212+
// First response is 403 not-rate-limited, second response has valid payload but should not
213+
// be invoked.
214+
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
215+
when(response.getStatusCode()).thenReturn(403).thenReturn(200);
216+
when(response.getContent())
217+
.thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
218+
.thenReturn(toStream(testTable));
219+
220+
thrown.expect(GoogleJsonResponseException.class);
221+
thrown.expectMessage("actually forbidden");
222+
223+
BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
224+
try {
225+
inserter.tryCreateTable(
226+
new Table(),
227+
"project",
228+
"dataset",
229+
new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
230+
Sleeper.DEFAULT);
231+
fail();
232+
} catch (IOException e) {
233+
verify(response, times(1)).getStatusCode();
234+
verify(response, times(1)).getContent();
235+
verify(response, times(1)).getContentType();
236+
throw e;
237+
}
238+
}
239+
}

0 commit comments

Comments
 (0)