Skip to content

Commit 12be5db

Browse files
dhalperidavorbonaci
authored andcommitted
DataflowPipelineRunner: retry source splitting when too many bundles
The Cloud Dataflow API has a limited number of bytes it will accept in a request or response payload -- today, 20 MB. One common place this limit can be reached is when a custom source split into many bundles. A common cause of this behavior is when the initial size estimation is wildly inaccurate, so that the Dataflow service requests bundles with a small amount of work. Today, messages hitting this API limit during initial splitting will cause a job to fail. This change adds a one-time step that increases the desired bundle size and retries splitting the source, to let jobs affected by inaccurate size estimation succeed in some cases. Also clean up related code, generalizing and extracting some utility functions to common utility classes and reducing the memory usage in pathological cases, preventing out-of-memory errors. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=113557076
1 parent ef71d47 commit 12be5db

File tree

6 files changed

+211
-118
lines changed

6 files changed

+211
-118
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java

Lines changed: 71 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static com.google.cloud.dataflow.sdk.util.Structs.addStringList;
2626
import static com.google.cloud.dataflow.sdk.util.Structs.getString;
2727
import static com.google.cloud.dataflow.sdk.util.Structs.getStrings;
28+
import static com.google.common.base.Preconditions.checkArgument;
2829

2930
import com.google.api.client.util.BackOff;
3031
import com.google.api.client.util.Base64;
@@ -33,7 +34,6 @@
3334
import com.google.api.services.dataflow.model.DerivedSource;
3435
import com.google.api.services.dataflow.model.DynamicSourceSplit;
3536
import com.google.api.services.dataflow.model.SourceMetadata;
36-
import com.google.api.services.dataflow.model.SourceOperationRequest;
3737
import com.google.api.services.dataflow.model.SourceOperationResponse;
3838
import com.google.api.services.dataflow.model.SourceSplitOptions;
3939
import com.google.api.services.dataflow.model.SourceSplitRequest;
@@ -47,6 +47,7 @@
4747
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
4848
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
4949
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
50+
import com.google.cloud.dataflow.sdk.runners.worker.DataflowApiUtils;
5051
import com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory;
5152
import com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils;
5253
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
@@ -61,7 +62,6 @@
6162
import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
6263
import com.google.cloud.dataflow.sdk.values.PValue;
6364
import com.google.common.annotations.VisibleForTesting;
64-
import com.google.common.base.Preconditions;
6565
import com.google.protobuf.ByteString;
6666

6767
import org.joda.time.Duration;
@@ -87,15 +87,11 @@ public class CustomSources {
8787
private static final String SERIALIZED_SOURCE = "serialized_source";
8888
@VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";
8989
private static final long DEFAULT_DESIRED_BUNDLE_SIZE_BYTES = 64 * (1 << 20);
90-
91-
public static final String TOO_MANY_SOURCE_SPLITS_ERROR =
92-
"Total number of Source objects generated by splitIntoBundles() operation, %d, is"
93-
+ " larger than the allowable limit, %d. For more information, please check the corresponding"
94-
+ " FAQ entry at:\n"
95-
+ "https://cloud.google.com/dataflow/faq";
96-
97-
// Maximum number of custom source splits currently supported by Dataflow.
98-
private static final int MAX_NUMBER_OF_SPLITS = 16000;
90+
/**
91+
* The current limit on the size of a ReportWorkItemStatus RPC to Google Cloud Dataflow, which
92+
* includes the initial splits, is 20 MB.
93+
*/
94+
public static final long DATAFLOW_SPLIT_RESPONSE_API_SIZE_BYTES = 20 * (1 << 20);
9995

10096
private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);
10197

@@ -144,17 +140,47 @@ public static DynamicSourceSplit toSourceSplit(
144140
* Executes a protocol-level split {@code SourceOperationRequest} for bounded sources
145141
* by deserializing its source to a {@code BoundedSource}, splitting it, and
146142
* serializing results back.
143+
*
144+
* <p>When the splits produced by this function are too large to be serialized to the Dataflow
145+
* API, splitting is retried once with an increase in the desired bundle size. This change aims
146+
* to work around API limitations on split size.
147147
*/
148-
public static SourceOperationResponse performSourceOperation(
149-
SourceOperationRequest request, PipelineOptions options) throws Exception {
150-
SourceOperationResponse response = new SourceOperationResponse();
151-
if (request.getSplit() != null) {
152-
response.setSplit(performSplit(request.getSplit(), options));
153-
} else {
154-
throw new UnsupportedOperationException(
155-
"Unsupported source operation request: " + request);
156-
}
157-
return response;
148+
public static SourceOperationResponse performSplit(
149+
SourceSplitRequest request, PipelineOptions options) throws Exception {
150+
Source<?> anySource = deserializeFromCloudSource(request.getSource().getSpec());
151+
checkArgument(
152+
anySource instanceof BoundedSource, "Cannot split a non-Bounded source: %s", anySource);
153+
BoundedSource<?> source = (BoundedSource<?>) anySource;
154+
155+
// Compute the desired bundle size given by the service, or default if none was provided.
156+
long desiredBundleSizeBytes = DEFAULT_DESIRED_BUNDLE_SIZE_BYTES;
157+
SourceSplitOptions splitOptions = request.getOptions();
158+
if (splitOptions != null && splitOptions.getDesiredBundleSizeBytes() != null) {
159+
desiredBundleSizeBytes = splitOptions.getDesiredBundleSizeBytes();
160+
}
161+
162+
// Try generating initial splits normally.
163+
SourceSplitResponse splits = performSplit(source, options, desiredBundleSizeBytes);
164+
long serializedSize = DataflowApiUtils.computeSerializedSizeBytes(splits);
165+
166+
// If split response is too large, scale desired size for expected DATAFLOW_API_SIZE_BYTES/2.
167+
if (serializedSize > DATAFLOW_SPLIT_RESPONSE_API_SIZE_BYTES) {
168+
double expansion = 2 * (double) serializedSize / DATAFLOW_SPLIT_RESPONSE_API_SIZE_BYTES;
169+
long expandedBundleSizeBytes = (long) (desiredBundleSizeBytes * expansion);
170+
LOG.warn(
171+
"Splitting source {} into bundles of estimated size {} bytes produced {} bundles, which"
172+
+ " have total serialized size {} bytes. As this is too large for the Google Cloud"
173+
+ " Dataflow API, retrying splitting once with increased desiredBundleSizeBytes {}"
174+
+ " to reduce the number of splits.",
175+
source,
176+
desiredBundleSizeBytes,
177+
splits.getBundles().size(),
178+
serializedSize,
179+
expandedBundleSizeBytes);
180+
splits = performSplit(source, options, expandedBundleSizeBytes);
181+
}
182+
183+
return new SourceOperationResponse().setSplit(splits);
158184
}
159185

160186
/**
@@ -270,9 +296,8 @@ private UnboundedSource<T, UnboundedSource.CheckpointMark> parseSource(int index
270296
} catch (Exception e) {
271297
throw new RuntimeException("Parsing serialized source splits failed: ", e);
272298
}
273-
Preconditions.checkArgument(
274-
serializedSplits != null, "UnboundedSource object did not contain splits");
275-
Preconditions.checkArgument(
299+
checkArgument(serializedSplits != null, "UnboundedSource object did not contain splits");
300+
checkArgument(
276301
index < serializedSplits.size(),
277302
"UnboundedSource splits contained too few splits. Requested index was %s, size was %s",
278303
index,
@@ -287,66 +312,48 @@ private UnboundedSource<T, UnboundedSource.CheckpointMark> parseSource(int index
287312
}
288313

289314
private static SourceSplitResponse performSplit(
290-
SourceSplitRequest request, PipelineOptions options)
315+
BoundedSource<?> source, PipelineOptions options, long desiredBundleSizeBytes)
291316
throws Exception {
292-
Source<?> anySource = deserializeFromCloudSource(request.getSource().getSpec());
293-
if (!(anySource instanceof BoundedSource)) {
294-
throw new UnsupportedOperationException("Cannot split a non-Bounded source: " + anySource);
295-
}
296-
BoundedSource<?> source = (BoundedSource<?>) anySource;
297-
LOG.debug("Splitting source: {}", source);
317+
LOG.debug("Splitting source {} into bundles of size {}", source, desiredBundleSizeBytes);
298318

299-
// Produce simple independent, unsplittable bundles with no metadata attached.
300-
SourceSplitResponse response = new SourceSplitResponse();
301-
response.setBundles(new ArrayList<DerivedSource>());
302-
SourceSplitOptions splitOptions = request.getOptions();
303-
Long desiredBundleSizeBytes =
304-
(splitOptions == null) ? null : splitOptions.getDesiredBundleSizeBytes();
305-
if (desiredBundleSizeBytes == null) {
306-
desiredBundleSizeBytes = DEFAULT_DESIRED_BUNDLE_SIZE_BYTES;
307-
}
308319
List<? extends BoundedSource<?>> bundles =
309-
source.splitIntoBundles(desiredBundleSizeBytes, options);
310-
311-
if (bundles.size() > MAX_NUMBER_OF_SPLITS) {
312-
throw new IOException(
313-
String.format(TOO_MANY_SOURCE_SPLITS_ERROR, bundles.size(), MAX_NUMBER_OF_SPLITS));
314-
}
320+
((BoundedSource<?>) source).splitIntoBundles(desiredBundleSizeBytes, options);
321+
List<DerivedSource> splits = new ArrayList<>(bundles.size());
315322

323+
// Produce simple independent, unsplittable bundles with no metadata attached.
316324
LOG.debug("Splitting produced {} bundles", bundles.size());
317325
for (BoundedSource<?> split : bundles) {
318326
try {
319327
split.validate();
320328
} catch (Exception e) {
321329
throw new IllegalArgumentException(
322-
"Splitting a valid source produced an invalid bundle. "
323-
+ "\nOriginal source: "
324-
+ source
325-
+ "\nInvalid bundle: "
326-
+ split,
330+
String.format(
331+
"Splitting a valid source produced an invalid source."
332+
+ "\nOriginal source: %s\nInvalid source: %s",
333+
source,
334+
split),
327335
e);
328336
}
329-
DerivedSource bundle = new DerivedSource();
330337

331-
com.google.api.services.dataflow.model.Source cloudSource =
332-
serializeToCloudSource(split, options);
333-
cloudSource.setDoesNotNeedSplitting(true);
334-
335-
bundle.setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT");
336-
bundle.setSource(cloudSource);
337-
response.getBundles().add(bundle);
338+
splits.add(
339+
new DerivedSource()
340+
.setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT")
341+
.setSource(serializeToCloudSource(split, options).setDoesNotNeedSplitting(true)));
338342
}
339-
response.setOutcome("SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED");
340-
return response;
343+
344+
// Return all the splits in the SourceSplitResponse.
345+
return new SourceSplitResponse()
346+
.setBundles(splits)
347+
.setOutcome("SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED");
341348
}
342349

343-
public static Source<?> deserializeFromCloudSource(Map<String, Object> spec) throws Exception {
350+
private static Source<?> deserializeFromCloudSource(Map<String, Object> spec) throws Exception {
344351
Source<?> source = (Source<?>) deserializeFromByteArray(
345352
Base64.decodeBase64(getString(spec, SERIALIZED_SOURCE)), "Source");
346353
try {
347354
source.validate();
348355
} catch (Exception e) {
349-
LOG.error("Invalid source: " + source, e);
356+
LOG.error("Invalid source: {}", source, e);
350357
throw e;
351358
}
352359
return source;
@@ -396,8 +403,7 @@ public static com.google.api.services.dataflow.model.Source serializeToCloudSour
396403
unboundedSource.generateInitialSplits(desiredNumSplits, options)) {
397404
encodedSplits.add(encodeBase64String(serializeToByteArray(split)));
398405
}
399-
Preconditions.checkArgument(
400-
!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
406+
checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
401407
addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits);
402408
} else {
403409
throw new IllegalArgumentException("Unexpected source kind: " + source.getClass());
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*******************************************************************************
2+
* Copyright (C) 2016 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
******************************************************************************/
16+
package com.google.cloud.dataflow.sdk.runners.worker;
17+
18+
import com.google.api.client.json.GenericJson;
19+
import com.google.api.client.json.JsonFactory;
20+
import com.google.api.client.json.JsonGenerator;
21+
import com.google.cloud.dataflow.sdk.util.Transport;
22+
import com.google.common.io.ByteStreams;
23+
import com.google.common.io.CountingOutputStream;
24+
25+
import java.io.IOException;
26+
import java.nio.charset.StandardCharsets;
27+
28+
/**
29+
* A utility class for generic interactions with the Google Cloud Dataflow API.
30+
*/
31+
public final class DataflowApiUtils {
32+
/**
33+
* Determines the serialized size (in bytes) of the {@link GenericJson} object that will be
34+
* serialized and sent to the Google Cloud Dataflow service API.
35+
*
36+
* <p>Uses only constant memory.
37+
*/
38+
public static long computeSerializedSizeBytes(GenericJson object) throws IOException {
39+
JsonFactory factory = object.getFactory();
40+
if (factory == null) {
41+
factory = Transport.getJsonFactory();
42+
}
43+
44+
CountingOutputStream stream = new CountingOutputStream(ByteStreams.nullOutputStream());
45+
JsonGenerator generator = null;
46+
try {
47+
generator = factory.createJsonGenerator(stream, StandardCharsets.UTF_8);
48+
generator.serialize(object);
49+
generator.close(); // also closes the stream.
50+
} finally {
51+
if (generator != null) {
52+
generator.close();
53+
}
54+
}
55+
return stream.getCount();
56+
}
57+
58+
// Prevent construction of utility class.
59+
private DataflowApiUtils() {}
60+
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/DataflowWorker.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow;
4242
import com.google.cloud.dataflow.sdk.util.SideInputReader;
4343
import com.google.cloud.dataflow.sdk.util.UserCodeException;
44+
import com.google.cloud.dataflow.sdk.util.Weighted;
4445
import com.google.cloud.dataflow.sdk.util.WeightedValue;
4546
import com.google.cloud.dataflow.sdk.util.common.Counter;
4647
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
@@ -106,7 +107,7 @@ public class DataflowWorker {
106107
private final UserCodeTimeTracker userCodeTimeTracker = new UserCodeTimeTracker();
107108

108109
/**
109-
* A weight in "bytes" for the overhead of a {@link Sized} wrapper in the cache. It is just an
110+
* A weight in "bytes" for the overhead of a {@link Weighted} wrapper in the cache. It is just an
110111
* approximation so it is OK for it to be fairly arbitrary as long as it is nonzero.
111112
*/
112113
private static final int OVERHEAD_WEIGHT = 8;
@@ -154,7 +155,7 @@ private boolean doWork(WorkItem workItem) throws IOException {
154155
// Populate PipelineOptions with data from work unit.
155156
options.setProject(workItem.getProjectId());
156157

157-
DataflowExecutionContext executionContext =
158+
DataflowExecutionContext<?> executionContext =
158159
new DataflowWorkerExecutionContext(sideInputCache, options);
159160

160161
CounterSet counters = new CounterSet();
@@ -268,9 +269,15 @@ private void handleWorkError(WorkItem workItem, WorkExecutor worker, long nextRe
268269
// TODO: Attach the stack trace as exception details, not to the message.
269270
error.setMessage(DataflowWorkerLoggingHandler.formatException(t));
270271

271-
reportStatus(options, "Failure", workItem, worker == null ? null : worker.getOutputCounters(),
272-
worker == null ? null : worker.getOutputMetrics(), null/*sourceOperationResponse*/,
273-
error == null ? null : Collections.singletonList(error), nextReportIndex);
272+
reportStatus(
273+
options,
274+
"Failure",
275+
workItem,
276+
worker == null ? null : worker.getOutputCounters(),
277+
worker == null ? null : worker.getOutputMetrics(),
278+
null /*sourceOperationResponse*/,
279+
Collections.singletonList(error),
280+
nextReportIndex);
274281
}
275282

276283
private void reportStatus(DataflowWorkerHarnessOptions options, String status, WorkItem workItem,

0 commit comments

Comments
 (0)