Skip to content

Commit 7cecf6e

Browse files
committed
Cache result of BigQuerySourceBase.split
1 parent 67bfc90 commit 7cecf6e

File tree

2 files changed

+32
-14
lines changed

2 files changed

+32
-14
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,6 +1211,8 @@ private abstract static class BigQuerySourceBase extends BoundedSource<TableRow>
12111211
protected final BigQueryServices bqServices;
12121212
protected final ValueProvider<String> executingProject;
12131213

1214+
private List<BoundedSource<TableRow>> cachedSplitResult;
1215+
12141216
private BigQuerySourceBase(
12151217
String jobIdToken,
12161218
String extractDestinationDir,
@@ -1225,19 +1227,30 @@ private BigQuerySourceBase(
12251227
@Override
12261228
public List<BoundedSource<TableRow>> splitIntoBundles(
12271229
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
1228-
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
1229-
TableReference tableToExtract = getTableToExtract(bqOptions);
1230-
JobService jobService = bqServices.getJobService(bqOptions);
1231-
String extractJobId = getExtractJobId(jobIdToken);
1232-
List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
1233-
1234-
TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable(
1235-
tableToExtract.getProjectId(),
1236-
tableToExtract.getDatasetId(),
1237-
tableToExtract.getTableId()).getSchema();
1238-
1239-
cleanupTempResource(bqOptions);
1240-
return createSources(tempFiles, tableSchema);
1230+
// splitIntoBundles() can be called multiple times, e.g. Dataflow runner may call it multiple
1231+
// times with different desiredBundleSizeBytes in case the splitIntoBundles() call produces
1232+
// too many sources. We ignore desiredBundleSizeBytes anyway, however in any case, we should
1233+
// not initiate another BigQuery extract job for the repeated splitIntoBundles() calls.
1234+
if (cachedSplitResult == null) {
1235+
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
1236+
TableReference tableToExtract = getTableToExtract(bqOptions);
1237+
JobService jobService = bqServices.getJobService(bqOptions);
1238+
String extractJobId = getExtractJobId(jobIdToken);
1239+
List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
1240+
1241+
TableSchema tableSchema =
1242+
bqServices
1243+
.getDatasetService(bqOptions)
1244+
.getTable(
1245+
tableToExtract.getProjectId(),
1246+
tableToExtract.getDatasetId(),
1247+
tableToExtract.getTableId())
1248+
.getSchema();
1249+
1250+
cleanupTempResource(bqOptions);
1251+
cachedSplitResult = createSources(tempFiles, tableSchema);
1252+
}
1253+
return cachedSplitResult;
12411254
}
12421255

12431256
protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.mockito.Matchers.eq;
3131
import static org.mockito.Mockito.doNothing;
3232
import static org.mockito.Mockito.doThrow;
33+
import static org.mockito.Mockito.times;
3334
import static org.mockito.Mockito.when;
3435

3536
import com.google.api.client.util.Data;
@@ -1130,10 +1131,14 @@ public void testBigQueryTableSourceInitSplit() throws Exception {
11301131

11311132
List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
11321133
assertEquals(1, sources.size());
1134+
// Simulate a repeated call to splitIntoBundles(), like a Dataflow worker will sometimes do.
1135+
sources = bqSource.splitIntoBundles(200, options);
1136+
assertEquals(1, sources.size());
11331137
BoundedSource<TableRow> actual = sources.get(0);
11341138
assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
11351139

1136-
Mockito.verify(mockJobService)
1140+
// A repeated call to splitIntoBundles() should not have caused a duplicate extract job.
1141+
Mockito.verify(mockJobService, times(1))
11371142
.startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
11381143
}
11391144

0 commit comments

Comments
 (0)