Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Removes unnecessary interface SourceFormat
Browse files Browse the repository at this point in the history
SourceFormat has no subclasses except BasicSerializableSourceFormat,
and is never intended to get more. This change removes the entire
level of abstraction associated with SourceFormat - the interface,
SourceFormatFactory, the bridge interfaces around SourceOperation
etc. Also renames BasicSerializableSourceFormat to CustomSourceHelper.
----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=104696163
  • Loading branch information
jkff authored and davorbonaci committed Oct 7, 2015
1 parent b1455a8 commit bb0f496
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 308 deletions.
4 changes: 2 additions & 2 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat;
import com.google.cloud.dataflow.sdk.runners.dataflow.CustomSources;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.PCollection;
Expand Down Expand Up @@ -151,7 +151,7 @@ private static void registerDefaultTransformEvaluator() {
@Override
public void evaluate(
Bounded transform, DirectPipelineRunner.EvaluationContext context) {
BasicSerializableSourceFormat.evaluateReadHelper(transform, context);
CustomSources.evaluateReadHelper(transform, context);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.JobSpecification;
import com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat;
import com.google.cloud.dataflow.sdk.runners.dataflow.CustomSources;
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Combine;
Expand Down Expand Up @@ -599,7 +599,7 @@ private static class ReadWithIdsTranslator
@Override
public void translate(ReadWithIds<?> transform,
DataflowPipelineTranslator.TranslationContext context) {
BasicSerializableSourceFormat.translateReadHelper(
CustomSources.translateReadHelper(
transform.getSource(), transform, context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

import static com.google.api.client.util.Base64.decodeBase64;
import static com.google.api.client.util.Base64.encodeBase64String;
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudSourceOperationResponseToSourceOperationResponse;
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudSourceToDictionary;
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.sourceOperationRequestToCloudSourceOperationRequest;
import static com.google.cloud.dataflow.sdk.util.SerializableUtils.deserializeFromByteArray;
import static com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray;
import static com.google.cloud.dataflow.sdk.util.Structs.addString;
Expand All @@ -33,8 +31,6 @@
import com.google.api.services.dataflow.model.ApproximateProgress;
import com.google.api.services.dataflow.model.DerivedSource;
import com.google.api.services.dataflow.model.DynamicSourceSplit;
import com.google.api.services.dataflow.model.SourceGetMetadataRequest;
import com.google.api.services.dataflow.model.SourceGetMetadataResponse;
import com.google.api.services.dataflow.model.SourceMetadata;
import com.google.api.services.dataflow.model.SourceOperationRequest;
import com.google.api.services.dataflow.model.SourceOperationResponse;
Expand Down Expand Up @@ -62,7 +58,6 @@
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.SourceFormat;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -84,11 +79,10 @@
/**
* A helper class for supporting sources defined as {@code Source}.
*
* <p>Provides a bridge between the high-level {@code Source} API and the raw
* API-level {@code SourceFormat} API, by encoding the serialized
* {@code Source} in a parameter of the API {@code Source} message.
* <p>Provides a bridge between the high-level {@code Source} API and the
* low-level {@code CloudSource} class.
*/
public class BasicSerializableSourceFormat implements SourceFormat {
public class CustomSources {
private static final String SERIALIZED_SOURCE = "serialized_source";
@VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";
private static final long DEFAULT_DESIRED_BUNDLE_SIZE_BYTES = 64 * (1 << 20);
Expand All @@ -102,13 +96,7 @@ public class BasicSerializableSourceFormat implements SourceFormat {
// Maximum number of custom source splits currently supported by Dataflow.
private static final int MAX_NUMBER_OF_SPLITS = 16000;

private static final Logger LOG = LoggerFactory.getLogger(BasicSerializableSourceFormat.class);

private final PipelineOptions options;

public BasicSerializableSourceFormat(PipelineOptions options) {
this.options = options;
}
private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);

/**
* A {@code DynamicSplitResult} specified explicitly by a pair of {@code BoundedSource}
Expand Down Expand Up @@ -156,23 +144,20 @@ public static DynamicSourceSplit toSourceSplit(
* by deserializing its source to a {@code BoundedSource}, splitting it, and
* serializing results back.
*/
@Override
public OperationResponse performSourceOperation(OperationRequest request) throws Exception {
SourceOperationRequest cloudRequest =
sourceOperationRequestToCloudSourceOperationRequest(request);
SourceOperationResponse cloudResponse = new SourceOperationResponse();
if (cloudRequest.getGetMetadata() != null) {
cloudResponse.setGetMetadata(performGetMetadata(cloudRequest.getGetMetadata()));
} else if (cloudRequest.getSplit() != null) {
cloudResponse.setSplit(performSplit(cloudRequest.getSplit()));
public static SourceOperationResponse performSourceOperation(
SourceOperationRequest request, PipelineOptions options) throws Exception {
SourceOperationResponse response = new SourceOperationResponse();
if (request.getSplit() != null) {
response.setSplit(performSplit(request.getSplit(), options));
} else {
throw new UnsupportedOperationException("Unknown source operation request");
throw new UnsupportedOperationException(
"Unsupported source operation request: " + request);
}
return cloudSourceOperationResponseToSourceOperationResponse(cloudResponse);
return response;
}

/**
* Factory to create a {@link BasicSerializableSourceFormat} from a Dataflow API
* Factory to create a {@link CustomSources} from a Dataflow API
* source specification.
*/
public static class Factory implements ReaderFactory {
Expand All @@ -188,7 +173,7 @@ public Reader<?> create(
// The parameter "coder" is deliberately never used. It is an artifact of ReaderFactory:
// some readers need a coder, some don't (i.e. for some it doesn't even make sense),
// but ReaderFactory passes it to all readers anyway.
return BasicSerializableSourceFormat.create(spec, options, executionContext);
return CustomSources.create(spec, options, executionContext);
}
}

Expand Down Expand Up @@ -297,7 +282,9 @@ private UnboundedSource<T, UnboundedSource.CheckpointMark> parseSource(int index
}
}

private SourceSplitResponse performSplit(SourceSplitRequest request) throws Exception {
private static SourceSplitResponse performSplit(
SourceSplitRequest request, PipelineOptions options)
throws Exception {
Source<?> anySource = deserializeFromCloudSource(request.getSource().getSpec());
if (!(anySource instanceof BoundedSource)) {
throw new UnsupportedOperationException("Cannot split a non-Bounded source: " + anySource);
Expand Down Expand Up @@ -349,20 +336,6 @@ private SourceSplitResponse performSplit(SourceSplitRequest request) throws Exce
return response;
}

private SourceGetMetadataResponse performGetMetadata(SourceGetMetadataRequest request)
throws Exception {
Source<?> source = deserializeFromCloudSource(request.getSource().getSpec());
SourceMetadata metadata = new SourceMetadata();
if (source instanceof BoundedSource) {
BoundedSource<?> boundedSource = (BoundedSource<?>) source;
metadata.setProducesSortedKeys(boundedSource.producesSortedKeys(options));
metadata.setEstimatedSizeBytes(boundedSource.getEstimatedSizeBytes(options));
}
SourceGetMetadataResponse response = new SourceGetMetadataResponse();
response.setMetadata(metadata);
return response;
}

public static Source<?> deserializeFromCloudSource(Map<String, Object> spec) throws Exception {
Source<?> source = (Source<?>) deserializeFromByteArray(
Base64.decodeBase64(getString(spec, SERIALIZED_SOURCE)), "Source");
Expand Down Expand Up @@ -390,7 +363,7 @@ public static com.google.api.services.dataflow.model.Source serializeToCloudSour
com.google.api.services.dataflow.model.Source cloudSource =
new com.google.api.services.dataflow.model.Source();
// We ourselves act as the SourceFormat.
cloudSource.setSpec(CloudObject.forClass(BasicSerializableSourceFormat.class));
cloudSource.setSpec(CloudObject.forClass(CustomSources.class));
addString(
cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source)));

Expand Down Expand Up @@ -655,7 +628,9 @@ public boolean hasNext() throws IOException {
}
try {
Thread.sleep(nextBackoff);
} catch (InterruptedException e) {}
} catch (InterruptedException e) {
// ignore.
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@
public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
@Override
public void translate(Read.Bounded<?> transform, TranslationContext context) {
BasicSerializableSourceFormat.translateReadHelper(transform.getSource(), transform, context);
CustomSources.translateReadHelper(transform.getSource(), transform, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@

import static com.google.cloud.dataflow.sdk.runners.worker.SourceOperationExecutor.SPLIT_RESPONSE_TOO_LARGE_ERROR;
import static com.google.cloud.dataflow.sdk.runners.worker.SourceOperationExecutor.isSplitResponseTooLarge;
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudSourceOperationResponseToSourceOperationResponse;
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.readerProgressToCloudProgress;
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.sourceOperationResponseToCloudSourceOperationResponse;
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.toCloudPosition;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.dataflow.model.MetricStructuredName;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
import com.google.api.services.dataflow.model.SourceOperationResponse;
import com.google.api.services.dataflow.model.Status;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerHarnessOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat;
import com.google.cloud.dataflow.sdk.runners.dataflow.CustomSources;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingHandler;
import com.google.cloud.dataflow.sdk.util.BatchModeExecutionContext;
import com.google.cloud.dataflow.sdk.util.CloudCounterUtils;
Expand All @@ -46,7 +45,6 @@
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.Metric;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.SourceFormat;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.util.common.worker.WorkExecutor;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
Expand Down Expand Up @@ -155,7 +153,7 @@ private boolean doWork(WorkItem workItem) throws IOException {
LOG.debug("Executing: {}", workItem);

WorkExecutor worker = null;
SourceFormat.OperationResponse operationResponse = null;
SourceOperationResponse operationResponse = null;
long nextReportIndex = workItem.getInitialReportIndex();
try {
// Populate PipelineOptions with data from work unit.
Expand Down Expand Up @@ -214,8 +212,7 @@ private boolean doWork(WorkItem workItem) throws IOException {
// into the work update.
operationResponse =
(worker instanceof SourceOperationExecutor)
? cloudSourceOperationResponseToSourceOperationResponse(
((SourceOperationExecutor) worker).getResponse())
? ((SourceOperationExecutor) worker).getResponse()
: null;

try {
Expand Down Expand Up @@ -283,7 +280,7 @@ private void handleWorkError(WorkItem workItem, WorkExecutor worker, long nextRe

private void reportStatus(DataflowWorkerHarnessOptions options, String status, WorkItem workItem,
@Nullable CounterSet counters, @Nullable Collection<Metric<?>> metrics,
@Nullable SourceFormat.OperationResponse operationResponse, @Nullable List<Status> errors,
@Nullable SourceOperationResponse operationResponse, @Nullable List<Status> errors,
long reportIndex)
throws IOException {
String message = "{} processing work item {}";
Expand All @@ -301,7 +298,7 @@ static WorkItemStatus buildStatus(WorkItem workItem, boolean completed,
@Nullable CounterSet counters, @Nullable Collection<Metric<?>> metrics,
DataflowWorkerHarnessOptions options, @Nullable Reader.Progress progress,
@Nullable Reader.DynamicSplitResult dynamicSplitResult,
@Nullable SourceFormat.OperationResponse operationResponse, @Nullable List<Status> errors,
@Nullable SourceOperationResponse operationResponse, @Nullable List<Status> errors,
long reportIndex) {

return buildStatus(workItem, completed, counters, metrics, options, progress,
Expand All @@ -312,7 +309,7 @@ static WorkItemStatus buildStatus(WorkItem workItem, boolean completed,
@Nullable CounterSet counters, @Nullable Collection<Metric<?>> metrics,
DataflowWorkerHarnessOptions options, @Nullable Reader.Progress progress,
@Nullable Reader.DynamicSplitResult dynamicSplitResult,
@Nullable SourceFormat.OperationResponse operationResponse, @Nullable List<Status> errors,
@Nullable SourceOperationResponse operationResponse, @Nullable List<Status> errors,
long reportIndex,
@Nullable StateSampler.StateSamplerInfo stateSamplerInfo) {
WorkItemStatus status = new WorkItemStatus();
Expand Down Expand Up @@ -378,18 +375,17 @@ static WorkItemStatus buildStatus(WorkItem workItem, boolean completed,
Reader.DynamicSplitResultWithPosition asPosition =
(Reader.DynamicSplitResultWithPosition) dynamicSplitResult;
status.setStopPosition(toCloudPosition(asPosition.getAcceptedPosition()));
} else if (dynamicSplitResult instanceof BasicSerializableSourceFormat.BoundedSourceSplit) {
} else if (dynamicSplitResult instanceof CustomSources.BoundedSourceSplit) {
status.setDynamicSourceSplit(
BasicSerializableSourceFormat.toSourceSplit(
(BasicSerializableSourceFormat.BoundedSourceSplit<?>) dynamicSplitResult, options));
CustomSources.toSourceSplit(
(CustomSources.BoundedSourceSplit<?>) dynamicSplitResult, options));
} else if (dynamicSplitResult != null) {
throw new IllegalArgumentException(
"Unexpected type of dynamic split result: " + dynamicSplitResult);
}

if (workItem.getSourceOperationTask() != null) {
status.setSourceOperationResponse(
sourceOperationResponseToCloudSourceOperationResponse(operationResponse));
status.setSourceOperationResponse(operationResponse);
}

return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.google.api.services.dataflow.model.Source;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat;
import com.google.cloud.dataflow.sdk.runners.dataflow.CustomSources;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.CloudSourceUtils;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
Expand Down Expand Up @@ -95,8 +95,8 @@ public static Registry defaultRegistry() {

// Custom sources
factories.put(
"com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat",
new BasicSerializableSourceFormat.Factory());
"com.google.cloud.dataflow.sdk.runners.dataflow.CustomSources",
new CustomSources.Factory());

return new Registry(factories);
}
Expand Down

This file was deleted.

Loading

0 comments on commit bb0f496

Please sign in to comment.