Skip to content

Commit e468cad

Browse files
dhalperidavorbonaci
authored andcommitted
DatastoreIO: do not split when QuerySplitter fails
As opposed to throwing an exception. Fixes GoogleCloudPlatform#101. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=112999725
1 parent ec5dfe8 commit e468cad

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,16 @@ public List<Source> splitIntoBundles(long desiredBundleSizeBytes, PipelineOption
326326
return ImmutableList.of(this);
327327
}
328328

329+
List<Query> datastoreSplits;
330+
try {
331+
datastoreSplits = getSplitQueries(Ints.checkedCast(numSplits), options);
332+
} catch (IllegalArgumentException | DatastoreException e) {
333+
LOG.warn("Unable to parallelize the given query: {}", query, e);
334+
return ImmutableList.of(this);
335+
}
336+
329337
ImmutableList.Builder<Source> splits = ImmutableList.builder();
330-
for (Query splitQuery : getSplitQueries(Ints.checkedCast(numSplits), options)) {
338+
for (Query splitQuery : datastoreSplits) {
331339
splits.add(new Source(host, datasetId, splitQuery, namespace));
332340
}
333341
return splits.build();

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static org.mockito.Mockito.mock;
3030
import static org.mockito.Mockito.never;
3131
import static org.mockito.Mockito.spy;
32+
import static org.mockito.Mockito.times;
3233
import static org.mockito.Mockito.verify;
3334
import static org.mockito.Mockito.verifyNoMoreInteractions;
3435
import static org.mockito.Mockito.when;
@@ -52,6 +53,7 @@
5253
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
5354
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
5455
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
56+
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
5557
import com.google.cloud.dataflow.sdk.util.TestCredential;
5658
import com.google.common.collect.Lists;
5759

@@ -97,6 +99,8 @@ public class DatastoreIOTest {
9799
@Rule
98100
public final ExpectedException thrown = ExpectedException.none();
99101

102+
@Rule public final ExpectedLogs logged = ExpectedLogs.none(DatastoreIO.Source.class);
103+
100104
@Before
101105
public void setUp() {
102106
MockitoAnnotations.initMocks(this);
@@ -330,6 +334,37 @@ public void testQueryDoesNotSplitWithLimitSet() throws Exception {
330334
verifyNoMoreInteractions(splitter);
331335
}
332336

337+
/**
338+
* Tests that when {@link QuerySplitter} cannot split a query, {@link DatastoreIO} falls back to
339+
* a single split.
340+
*/
341+
@Test
342+
public void testQuerySplitterThrows() throws Exception {
343+
// Mock query splitter that throws IllegalArgumentException
344+
IllegalArgumentException exception =
345+
new IllegalArgumentException("query not supported by splitter");
346+
QuerySplitter splitter = mock(QuerySplitter.class);
347+
when(
348+
splitter.getSplits(
349+
any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class)))
350+
.thenThrow(exception);
351+
352+
Query query = Query.newBuilder().addKind(KindExpression.newBuilder().setName("myKind")).build();
353+
List<DatastoreIO.Source> bundles =
354+
initialSource
355+
.withQuery(query)
356+
.withMockSplitter(splitter)
357+
.withMockEstimateSizeBytes(10240L)
358+
.splitIntoBundles(1024, testPipelineOptions(null));
359+
360+
assertEquals(1, bundles.size());
361+
assertEquals(query, bundles.get(0).getQuery());
362+
verify(splitter, times(1))
363+
.getSplits(
364+
any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class));
365+
logged.verifyWarn("Unable to parallelize the given query", exception);
366+
}
367+
333368
@Test
334369
public void testQuerySplitSizeUnavailable() throws Exception {
335370
KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();

0 commit comments

Comments
 (0)