Skip to content

Commit

Permalink
Add BundleFactory, ImmutabilityCheckingBundleFactory
Browse files Browse the repository at this point in the history
This allows checks to be made on the contents of bundles.
ImmutabilityCheckingBundleFactory produces bundles that ensure that
elements output to a bundle are not modified after being output.
  • Loading branch information
tgroh committed Apr 4, 2016
1 parent 5ed12d9 commit a6261e1
Show file tree
Hide file tree
Showing 18 changed files with 974 additions and 154 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.values.PCollection;

/**
* A factory that creates {@link UncommittedBundle UncommittedBundles}.
*/
public interface BundleFactory {
/**
* Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle belong to
* the {@code output} {@link PCollection}.
*/
public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output);

/**
* Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle
* belong to the {@code output} {@link PCollection}.
*/
public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output);

/**
* Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by
* {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
* belong to the {@code output} {@link PCollection}.
*/
public <T> UncommittedBundle<T> createKeyedBundle(
CommittedBundle<?> input, Object key, PCollection<T> output);
}

Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,9 @@ private boolean fireTimers() throws Exception {
KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
@SuppressWarnings({"unchecked", "rawtypes"})
CommittedBundle<?> bundle =
InProcessBundle.<KeyedWorkItem<Object, Object>>keyed(
(PCollection) transform.getInput(), keyTimers.getKey())
evaluationContext
.createKeyedBundle(
null, keyTimers.getKey(), (PCollection) transform.getInput())
.add(WindowedValue.valueInEmptyWindows(work))
.commit(Instant.now());
scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.client.util.Throwables;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
import com.google.cloud.dataflow.sdk.util.MutationDetector;
import com.google.cloud.dataflow.sdk.util.MutationDetectors;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;

import org.joda.time.Instant;

/**
* A {@link BundleFactory} that ensures that elements added to it are not mutated after being
* output. Immutability checks are enforced at the time {@link UncommittedBundle#commit(Instant)} is
* called, checking the value at that time against the value at the time the element was added. All
* elements added to the bundle will be encoded by the {@link Coder} of the underlying
* {@link PCollection}.
*
* <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element
* after it is added to an output {@link PCollection}.
*/
class ImmutabilityCheckingBundleFactory implements BundleFactory {
/**
* Create a new {@link ImmutabilityCheckingBundleFactory} that uses the underlying
* {@link BundleFactory} to create the output bundle.
*/
public static ImmutabilityCheckingBundleFactory create(BundleFactory underlying) {
return new ImmutabilityCheckingBundleFactory(underlying);
}

private final BundleFactory underlying;

private ImmutabilityCheckingBundleFactory(BundleFactory underlying) {
this.underlying = checkNotNull(underlying);
}

@Override
public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
return new ImmutabilityEnforcingBundle<>(underlying.createRootBundle(output));
}

@Override
public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
return new ImmutabilityEnforcingBundle<>(underlying.createBundle(input, output));
}

@Override
public <T> UncommittedBundle<T> createKeyedBundle(
CommittedBundle<?> input, Object key, PCollection<T> output) {
return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output));
}

private static class ImmutabilityEnforcingBundle<T> implements UncommittedBundle<T> {
private final UncommittedBundle<T> underlying;
private final SetMultimap<WindowedValue<T>, MutationDetector> mutationDetectors;
private Coder<T> coder;

public ImmutabilityEnforcingBundle(UncommittedBundle<T> underlying) {
this.underlying = underlying;
mutationDetectors = HashMultimap.create();
coder = SerializableUtils.clone(getPCollection().getCoder());
}

@Override
public PCollection<T> getPCollection() {
return underlying.getPCollection();
}

@Override
public UncommittedBundle<T> add(WindowedValue<T> element) {
try {
mutationDetectors.put(
element, MutationDetectors.forValueWithCoder(element.getValue(), coder));
} catch (CoderException e) {
throw Throwables.propagate(e);
}
underlying.add(element);
return this;
}

@Override
public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
for (MutationDetector detector : mutationDetectors.values()) {
try {
detector.verifyUnmodified();
} catch (IllegalMutationException exn) {
throw UserCodeException.wrap(
new IllegalMutationException(
String.format(
"PTransform %s mutated value %s after it was output (new value was %s)."
+ " Values must not be mutated in any way after being output.",
underlying.getPCollection().getProducingTransformInternal().getFullName(),
exn.getSavedValue(),
exn.getNewValue()),
exn.getSavedValue(),
exn.getNewValue(),
exn));
}
}
return underlying.commit(synchronizedProcessingTime);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;

import org.joda.time.Instant;

import javax.annotation.Nullable;

/**
* A factory that produces bundles that perform no additional validation.
*/
class InProcessBundleFactory implements BundleFactory {
public static InProcessBundleFactory create() {
return new InProcessBundleFactory();
}

private InProcessBundleFactory() {}

@Override
public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
return InProcessBundle.unkeyed(output);
}

@Override
public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
return input.isKeyed()
? InProcessBundle.keyed(output, input.getKey())
: InProcessBundle.unkeyed(output);
}

@Override
public <T> UncommittedBundle<T> createKeyedBundle(
CommittedBundle<?> input, Object key, PCollection<T> output) {
return InProcessBundle.keyed(output, key);
}

/**
* A {@link UncommittedBundle} that buffers elements in memory.
*/
private static final class InProcessBundle<T> implements UncommittedBundle<T> {
private final PCollection<T> pcollection;
private final boolean keyed;
private final Object key;
private boolean committed = false;
private ImmutableList.Builder<WindowedValue<T>> elements;

/**
* Create a new {@link InProcessBundle} for the specified {@link PCollection} without a key.
*/
public static <T> InProcessBundle<T> unkeyed(PCollection<T> pcollection) {
return new InProcessBundle<T>(pcollection, false, null);
}

/**
* Create a new {@link InProcessBundle} for the specified {@link PCollection} with the specified
* key.
*
* <p>See {@link CommittedBundle#getKey()} and {@link CommittedBundle#isKeyed()} for more
* information.
*/
public static <T> InProcessBundle<T> keyed(PCollection<T> pcollection, Object key) {
return new InProcessBundle<T>(pcollection, true, key);
}

private InProcessBundle(PCollection<T> pcollection, boolean keyed, Object key) {
this.pcollection = pcollection;
this.keyed = keyed;
this.key = key;
this.elements = ImmutableList.builder();
}

@Override
public PCollection<T> getPCollection() {
return pcollection;
}

@Override
public InProcessBundle<T> add(WindowedValue<T> element) {
checkState(
!committed,
"Can't add element %s to committed bundle in PCollection %s",
element,
pcollection);
elements.add(element);
return this;
}

@Override
public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
checkState(!committed, "Can't commit already committed bundle %s", this);
committed = true;
final Iterable<WindowedValue<T>> committedElements = elements.build();
return new CommittedBundle<T>() {
@Override
@Nullable
public Object getKey() {
return key;
}

@Override
public boolean isKeyed() {
return keyed;
}

@Override
public Iterable<WindowedValue<T>> getElements() {
return committedElements;
}

@Override
public PCollection<T> getPCollection() {
return pcollection;
}

@Override
public Instant getSynchronizedProcessingOutputWatermark() {
return synchronizedCompletionTime;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("pcollection", pcollection)
.add("key", key)
.add("elements", committedElements)
.toString();
}
};
}
}
}

Loading

0 comments on commit a6261e1

Please sign in to comment.