diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java new file mode 100644 index 0000000000..84403afc8a --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.values.PCollection; + +/** + * A factory that creates {@link UncommittedBundle UncommittedBundles}. + */ +public interface BundleFactory { + /** + * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle belong to + * the {@code output} {@link PCollection}. + */ + public UncommittedBundle createRootBundle(PCollection output); + + /** + * Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle + * belong to the {@code output} {@link PCollection}. + */ + public UncommittedBundle createBundle(CommittedBundle input, PCollection output); + + /** + * Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by + * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle + * belong to the {@code output} {@link PCollection}. + */ + public UncommittedBundle createKeyedBundle( + CommittedBundle input, Object key, PCollection output); +} + diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index fcd8182b8e..dd73d67f0c 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -372,8 +372,9 @@ private boolean fireTimers() throws Exception { KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery); @SuppressWarnings({"unchecked", "rawtypes"}) CommittedBundle bundle = - InProcessBundle.>keyed( - (PCollection) transform.getInput(), keyTimers.getKey()) + evaluationContext + .createKeyedBundle( + null, keyTimers.getKey(), (PCollection) transform.getInput()) .add(WindowedValue.valueInEmptyWindows(work)) .commit(Instant.now()); scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java new file mode 100644 index 0000000000..908636a2c2 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.util.Throwables; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.util.IllegalMutationException; +import com.google.cloud.dataflow.sdk.util.MutationDetector; +import com.google.cloud.dataflow.sdk.util.MutationDetectors; +import com.google.cloud.dataflow.sdk.util.SerializableUtils; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; + +import org.joda.time.Instant; + +/** + * A {@link BundleFactory} that ensures that elements added to it are not mutated after being + * output. Immutability checks are enforced at the time {@link UncommittedBundle#commit(Instant)} is + * called, checking the value at that time against the value at the time the element was added. All + * elements added to the bundle will be encoded by the {@link Coder} of the underlying + * {@link PCollection}. + * + *

This catches errors during the execution of a {@link DoFn} caused by modifying an element + * after it is added to an output {@link PCollection}. + */ +class ImmutabilityCheckingBundleFactory implements BundleFactory { + /** + * Create a new {@link ImmutabilityCheckingBundleFactory} that uses the underlying + * {@link BundleFactory} to create the output bundle. + */ + public static ImmutabilityCheckingBundleFactory create(BundleFactory underlying) { + return new ImmutabilityCheckingBundleFactory(underlying); + } + + private final BundleFactory underlying; + + private ImmutabilityCheckingBundleFactory(BundleFactory underlying) { + this.underlying = checkNotNull(underlying); + } + + @Override + public UncommittedBundle createRootBundle(PCollection output) { + return new ImmutabilityEnforcingBundle<>(underlying.createRootBundle(output)); + } + + @Override + public UncommittedBundle createBundle(CommittedBundle input, PCollection output) { + return new ImmutabilityEnforcingBundle<>(underlying.createBundle(input, output)); + } + + @Override + public UncommittedBundle createKeyedBundle( + CommittedBundle input, Object key, PCollection output) { + return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output)); + } + + private static class ImmutabilityEnforcingBundle implements UncommittedBundle { + private final UncommittedBundle underlying; + private final SetMultimap, MutationDetector> mutationDetectors; + private Coder coder; + + public ImmutabilityEnforcingBundle(UncommittedBundle underlying) { + this.underlying = underlying; + mutationDetectors = HashMultimap.create(); + coder = SerializableUtils.clone(getPCollection().getCoder()); + } + + @Override + public PCollection getPCollection() { + return underlying.getPCollection(); + } + + @Override + public UncommittedBundle add(WindowedValue element) { + try { + mutationDetectors.put( + element, MutationDetectors.forValueWithCoder(element.getValue(), coder)); + } catch (CoderException e) { + throw Throwables.propagate(e); + } + underlying.add(element); + return this; + } + + @Override + public CommittedBundle commit(Instant synchronizedProcessingTime) { + for (MutationDetector detector : mutationDetectors.values()) { + try { + detector.verifyUnmodified(); + } catch (IllegalMutationException exn) { + throw UserCodeException.wrap( + new IllegalMutationException( + String.format( + "PTransform %s mutated value %s after it was output (new value was %s)." + + " Values must not be mutated in any way after being output.", + underlying.getPCollection().getProducingTransformInternal().getFullName(), + exn.getSavedValue(), + exn.getNewValue()), + exn.getSavedValue(), + exn.getNewValue(), + exn)); + } + } + return underlying.commit(synchronizedProcessingTime); + } + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java new file mode 100644 index 0000000000..30bf3d6763 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java @@ -0,0 +1,155 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; + +import javax.annotation.Nullable; + +/** + * A factory that produces bundles that perform no additional validation. + */ +class InProcessBundleFactory implements BundleFactory { + public static InProcessBundleFactory create() { + return new InProcessBundleFactory(); + } + + private InProcessBundleFactory() {} + + @Override + public UncommittedBundle createRootBundle(PCollection output) { + return InProcessBundle.unkeyed(output); + } + + @Override + public UncommittedBundle createBundle(CommittedBundle input, PCollection output) { + return input.isKeyed() + ? InProcessBundle.keyed(output, input.getKey()) + : InProcessBundle.unkeyed(output); + } + + @Override + public UncommittedBundle createKeyedBundle( + CommittedBundle input, Object key, PCollection output) { + return InProcessBundle.keyed(output, key); + } + + /** + * A {@link UncommittedBundle} that buffers elements in memory. + */ + private static final class InProcessBundle implements UncommittedBundle { + private final PCollection pcollection; + private final boolean keyed; + private final Object key; + private boolean committed = false; + private ImmutableList.Builder> elements; + + /** + * Create a new {@link InProcessBundle} for the specified {@link PCollection} without a key. + */ + public static InProcessBundle unkeyed(PCollection pcollection) { + return new InProcessBundle(pcollection, false, null); + } + + /** + * Create a new {@link InProcessBundle} for the specified {@link PCollection} with the specified + * key. + * + *

See {@link CommittedBundle#getKey()} and {@link CommittedBundle#isKeyed()} for more + * information. + */ + public static InProcessBundle keyed(PCollection pcollection, Object key) { + return new InProcessBundle(pcollection, true, key); + } + + private InProcessBundle(PCollection pcollection, boolean keyed, Object key) { + this.pcollection = pcollection; + this.keyed = keyed; + this.key = key; + this.elements = ImmutableList.builder(); + } + + @Override + public PCollection getPCollection() { + return pcollection; + } + + @Override + public InProcessBundle add(WindowedValue element) { + checkState( + !committed, + "Can't add element %s to committed bundle in PCollection %s", + element, + pcollection); + elements.add(element); + return this; + } + + @Override + public CommittedBundle commit(final Instant synchronizedCompletionTime) { + checkState(!committed, "Can't commit already committed bundle %s", this); + committed = true; + final Iterable> committedElements = elements.build(); + return new CommittedBundle() { + @Override + @Nullable + public Object getKey() { + return key; + } + + @Override + public boolean isKeyed() { + return keyed; + } + + @Override + public Iterable> getElements() { + return committedElements; + } + + @Override + public PCollection getPCollection() { + return pcollection; + } + + @Override + public Instant getSynchronizedProcessingOutputWatermark() { + return synchronizedCompletionTime; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("pcollection", pcollection) + .add("key", key) + .add("elements", committedElements) + .toString(); + } + }; + } + } +} + diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index 4aeb0d3b2a..b2aa45ef62 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -75,6 +75,7 @@ class InProcessEvaluationContext { /** The options that were used to create this {@link Pipeline}. */ private final InProcessPipelineOptions options; + private final BundleFactory bundleFactory; /** The current processing time and event time watermarks and timers. */ private final InMemoryWatermarkManager watermarkManager; @@ -91,21 +92,24 @@ class InProcessEvaluationContext { public static InProcessEvaluationContext create( InProcessPipelineOptions options, + BundleFactory bundleFactory, Collection> rootTransforms, Map>> valueToConsumers, Map, String> stepNames, Collection> views) { return new InProcessEvaluationContext( - options, rootTransforms, valueToConsumers, stepNames, views); + options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views); } private InProcessEvaluationContext( InProcessPipelineOptions options, + BundleFactory bundleFactory, Collection> rootTransforms, Map>> valueToConsumers, Map, String> stepNames, Collection> views) { this.options = checkNotNull(options); + this.bundleFactory = checkNotNull(bundleFactory); checkNotNull(rootTransforms); checkNotNull(valueToConsumers); checkNotNull(stepNames); @@ -205,7 +209,7 @@ private void fireAvailableCallbacks(AppliedPTransform producingTransfor * Create a {@link UncommittedBundle} for use by a source. */ public UncommittedBundle createRootBundle(PCollection output) { - return InProcessBundle.unkeyed(output); + return bundleFactory.createRootBundle(output); } /** @@ -213,9 +217,7 @@ public UncommittedBundle createRootBundle(PCollection output) { * PCollection}. */ public UncommittedBundle createBundle(CommittedBundle input, PCollection output) { - return input.isKeyed() - ? InProcessBundle.keyed(output, input.getKey()) - : InProcessBundle.unkeyed(output); + return bundleFactory.createBundle(input, output); } /** @@ -224,7 +226,7 @@ public UncommittedBundle createBundle(CommittedBundle input, PCollecti */ public UncommittedBundle createKeyedBundle( CommittedBundle input, Object key, PCollection output) { - return InProcessBundle.keyed(output, key); + return bundleFactory.createKeyedBundle(input, key, output); } /** @@ -353,7 +355,9 @@ public CounterSet getCounters() { * for each time they are set. */ public Map, Map> extractFiredTimers() { - return watermarkManager.extractFiredTimers(); + Map, Map> fired = + watermarkManager.extractFiredTimers(); + return fired; } /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 82d5e170f6..9b6db8f730 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -230,6 +230,7 @@ public InProcessPipelineResult run(Pipeline pipeline) { InProcessEvaluationContext context = InProcessEvaluationContext.create( getPipelineOptions(), + createBundleFactory(getPipelineOptions()), consumerTrackingVisitor.getRootTransforms(), consumerTrackingVisitor.getValueToConsumers(), consumerTrackingVisitor.getStepNames(), @@ -269,6 +270,10 @@ public InProcessPipelineResult run(Pipeline pipeline) { return Collections.emptyMap(); } + private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) { + return InProcessBundleFactory.create(); + } + /** * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}. * diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java index e641dd6181..5de5a59b25 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java @@ -20,7 +20,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder; @@ -46,6 +45,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import java.io.IOException; import java.util.Arrays; @@ -61,20 +61,22 @@ public class BoundedReadEvaluatorFactoryTest { private PCollection longs; private TransformEvaluatorFactory factory; @Mock private InProcessEvaluationContext context; + private BundleFactory bundleFactory; @Before public void setup() { + MockitoAnnotations.initMocks(this); source = CountingSource.upTo(10L); TestPipeline p = TestPipeline.create(); longs = p.apply(Read.from(source)); factory = new BoundedReadEvaluatorFactory(); - context = mock(InProcessEvaluationContext.class); + bundleFactory = InProcessBundleFactory.create(); } @Test public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { - UncommittedBundle output = InProcessBundle.unkeyed(longs); + UncommittedBundle output = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(output); TransformEvaluator evaluator = @@ -94,8 +96,7 @@ public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exc */ @Test public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception { - UncommittedBundle output = - InProcessBundle.unkeyed(longs); + UncommittedBundle output = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(output); TransformEvaluator evaluator = @@ -109,7 +110,7 @@ public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws E containsInAnyOrder( gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); - UncommittedBundle secondOutput = InProcessBundle.unkeyed(longs); + UncommittedBundle secondOutput = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(secondOutput); TransformEvaluator secondEvaluator = factory.forApplication(longs.getProducingTransformInternal(), null, context); @@ -130,8 +131,8 @@ public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws E */ @Test public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception { - UncommittedBundle output = InProcessBundle.unkeyed(longs); - UncommittedBundle secondOutput = InProcessBundle.unkeyed(longs); + UncommittedBundle output = bundleFactory.createRootBundle(longs); + UncommittedBundle secondOutput = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(output).thenReturn(secondOutput); // create both evaluators before finishing either. @@ -169,7 +170,7 @@ public void boundedSourceEvaluatorClosesReader() throws Exception { PCollection pcollection = p.apply(Read.from(source)); AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle output = InProcessBundle.unkeyed(longs); + UncommittedBundle output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator evaluator = factory.forApplication(sourceTransform, null, context); @@ -187,7 +188,7 @@ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { PCollection pcollection = p.apply(Read.from(source)); AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle output = InProcessBundle.unkeyed(longs); + UncommittedBundle output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator evaluator = factory.forApplication(sourceTransform, null, context); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java index 0120b9880d..42777a47e7 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java @@ -43,6 +43,7 @@ */ @RunWith(JUnit4.class) public class FlattenEvaluatorFactoryTest { + private BundleFactory bundleFactory = InProcessBundleFactory.create(); @Test public void testFlattenInMemoryEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); @@ -52,13 +53,15 @@ public void testFlattenInMemoryEvaluator() throws Exception { PCollection flattened = list.apply(Flatten.pCollections()); - CommittedBundle leftBundle = InProcessBundle.unkeyed(left).commit(Instant.now()); - CommittedBundle rightBundle = InProcessBundle.unkeyed(right).commit(Instant.now()); + CommittedBundle leftBundle = + bundleFactory.createRootBundle(left).commit(Instant.now()); + CommittedBundle rightBundle = + bundleFactory.createRootBundle(right).commit(Instant.now()); InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); - UncommittedBundle flattenedLeftBundle = InProcessBundle.unkeyed(flattened); - UncommittedBundle flattenedRightBundle = InProcessBundle.unkeyed(flattened); + UncommittedBundle flattenedLeftBundle = bundleFactory.createRootBundle(flattened); + UncommittedBundle flattenedRightBundle = bundleFactory.createRootBundle(flattened); when(context.createBundle(leftBundle, flattened)).thenReturn(flattenedLeftBundle); when(context.createBundle(rightBundle, flattened)).thenReturn(flattenedRightBundle); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java index 4ced82f8c7..deb2a64e78 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java @@ -48,6 +48,8 @@ */ @RunWith(JUnit4.class) public class GroupByKeyEvaluatorFactoryTest { + private BundleFactory bundleFactory = InProcessBundleFactory.create(); + @Test public void testInMemoryEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); @@ -65,15 +67,15 @@ public void testInMemoryEvaluator() throws Exception { kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly()); CommittedBundle>> inputBundle = - InProcessBundle.unkeyed(kvs).commit(Instant.now()); + bundleFactory.createRootBundle(kvs).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); UncommittedBundle> fooBundle = - InProcessBundle.keyed(groupedKvs, "foo"); + bundleFactory.createKeyedBundle(null, "foo", groupedKvs); UncommittedBundle> barBundle = - InProcessBundle.keyed(groupedKvs, "bar"); + bundleFactory.createKeyedBundle(null, "bar", groupedKvs); UncommittedBundle> bazBundle = - InProcessBundle.keyed(groupedKvs, "baz"); + bundleFactory.createKeyedBundle(null, "baz", groupedKvs); when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle); when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java index 52398cf73a..455fcca120 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java @@ -82,6 +82,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { private transient PCollection flattened; private transient InMemoryWatermarkManager manager; + private transient BundleFactory bundleFactory; @Before public void setup() { @@ -129,6 +130,7 @@ public void processElement(DoFn.ProcessContext c) throws Excep clock = MockClock.fromInstant(new Instant(1000)); manager = InMemoryWatermarkManager.create(clock, rootTransforms, consumers); + bundleFactory = InProcessBundleFactory.create(); } /** @@ -244,7 +246,7 @@ public void getWatermarkForMultiInputTransform() { assertThat(withBufferedElements.getOutputWatermark(), equalTo(firstCollectionTimestamp)); CommittedBundle completedFlattenBundle = - InProcessBundle.unkeyed(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(), TimerUpdate.empty(), Collections.>singleton(completedFlattenBundle), null); @@ -340,13 +342,13 @@ public void updateWatermarkWithWatermarkHolds() { @Test public void updateWatermarkWithKeyedWatermarkHolds() { CommittedBundle firstKeyBundle = - InProcessBundle.keyed(createdInts, "Odd") + bundleFactory.createKeyedBundle(null, "Odd", createdInts) .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L))) .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L))) .commit(clock.now()); CommittedBundle secondKeyBundle = - InProcessBundle.keyed(createdInts, "Even") + bundleFactory.createKeyedBundle(null, "Even", createdInts) .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L))) .commit(clock.now()); @@ -366,7 +368,7 @@ public void updateWatermarkWithKeyedWatermarkHolds() { assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L)))); CommittedBundle fauxFirstKeyTimerBundle = - InProcessBundle.keyed(createdInts, "Odd").commit(clock.now()); + bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now()); manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(), TimerUpdate.empty(), Collections.>emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -374,7 +376,7 @@ public void updateWatermarkWithKeyedWatermarkHolds() { assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L))); CommittedBundle fauxSecondKeyTimerBundle = - InProcessBundle.keyed(createdInts, "Even").commit(clock.now()); + bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now()); manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(), TimerUpdate.empty(), Collections.>emptyList(), new Instant(5678L)); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L))); @@ -394,7 +396,7 @@ public void updateWatermarkWithKeyedWatermarkHolds() { @Test public void updateOutputWatermarkShouldBeMonotonic() { CommittedBundle firstInput = - InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), Collections.>singleton(firstInput), new Instant(0L)); TransformWatermarks firstWatermarks = @@ -402,7 +404,7 @@ public void updateOutputWatermarkShouldBeMonotonic() { assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L))); CommittedBundle secondInput = - InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), Collections.>singleton(secondInput), new Instant(-250L)); TransformWatermarks secondWatermarks = @@ -577,7 +579,7 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE))); CommittedBundle createOutput = - InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L)); + bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), Collections.>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -604,7 +606,7 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { not(laterThan(new Instant(1250L)))); CommittedBundle filterOutputBundle = - InProcessBundle.unkeyed(intsToFlatten).commit(new Instant(1250L)); + bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L)); manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(), TimerUpdate.empty(), Collections.>singleton(filterOutputBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -671,9 +673,11 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime))); CommittedBundle filteredTimerBundle = - InProcessBundle.keyed(filtered, "key").commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory + .createKeyedBundle(null, "key", filtered) + .commit(BoundedWindow.TIMESTAMP_MAX_VALUE); CommittedBundle filteredTimerResult = - InProcessBundle.keyed(filteredTimesTwo, "key") + bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo) .commit(filteredWms.getSynchronizedProcessingOutputTime()); // Complete the processing time timer manager.updateWatermarks(filteredTimerBundle, filtered.getProducingTransformInternal(), @@ -723,7 +727,7 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() { not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE))); CommittedBundle createOutput = - InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L)); + bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), Collections.>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -734,7 +738,7 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() { createAfterUpdate.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now()))); CommittedBundle createSecondOutput = - InProcessBundle.unkeyed(createdInts).commit(new Instant(750L)); + bundleFactory.createRootBundle(createdInts).commit(new Instant(750L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), Collections.>singleton(createSecondOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -782,13 +786,20 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers( @Test public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() { CommittedBundle created = globallyWindowedBundle(createdInts, 1, 2, 3); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(created), new Instant(29_919_235L)); + manager.updateWatermarks( + null, + createdInts.getProducingTransformInternal(), + TimerUpdate.empty(), + Collections.>singleton(created), + new Instant(29_919_235L)); Instant upstreamHold = new Instant(2048L); CommittedBundle filteredBundle = - InProcessBundle.keyed(filtered, "key").commit(upstreamHold); - manager.updateWatermarks(created, filtered.getProducingTransformInternal(), TimerUpdate.empty(), + bundleFactory.createKeyedBundle(null, "key", filtered).commit(upstreamHold); + manager.updateWatermarks( + created, + filtered.getProducingTransformInternal(), + TimerUpdate.empty(), Collections.>singleton(filteredBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -1092,7 +1103,7 @@ public void describeTo(Description description) { @SafeVarargs private final CommittedBundle timestampedBundle( PCollection pc, TimestampedValue... values) { - UncommittedBundle bundle = InProcessBundle.unkeyed(pc); + UncommittedBundle bundle = bundleFactory.createRootBundle(pc); for (TimestampedValue value : values) { bundle.add( WindowedValue.timestampedValueInGlobalWindow(value.getValue(), value.getTimestamp())); @@ -1102,7 +1113,7 @@ private final CommittedBundle timestampedBundle( @SafeVarargs private final CommittedBundle globallyWindowedBundle(PCollection pc, T... values) { - UncommittedBundle bundle = InProcessBundle.unkeyed(pc); + UncommittedBundle bundle = bundleFactory.createRootBundle(pc); for (T value : values) { bundle.add(WindowedValue.valueInGlobalWindow(value)); } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index 05e38babc6..6268b4b771 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -110,6 +110,7 @@ public void setup() { context = InProcessEvaluationContext.create( runner.getPipelineOptions(), + InProcessBundleFactory.create(), rootTransforms, valueToConsumers, cVis.getStepNames(), @@ -156,7 +157,9 @@ public void getExecutionContextSameStepSameKeyState() { stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); context.handleResult( - InProcessBundle.keyed(created, "foo").commit(Instant.now()), + InProcessBundleFactory.create() + .createKeyedBundle(null, "foo", created) + .commit(Instant.now()), ImmutableList.of(), StepTransformResult.withoutHold(created.getProducingTransformInternal()) .withState(stepContext.commitState()) @@ -248,7 +251,7 @@ public void handleResultMergesCounters() { .withCounters(againCounters) .build(); context.handleResult( - InProcessBundle.unkeyed(created).commit(Instant.now()), + context.createRootBundle(created).commit(Instant.now()), ImmutableList.of(), secondResult); assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L)); @@ -275,7 +278,7 @@ public void handleResultStoresState() { .build(); context.handleResult( - InProcessBundle.keyed(created, myKey).commit(Instant.now()), + context.createKeyedBundle(null, myKey, created).commit(Instant.now()), ImmutableList.of(), stateResult); @@ -357,7 +360,7 @@ public void extractFiredTimersExtractsTimers() { // haven't added any timers, must be empty assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); context.handleResult( - InProcessBundle.keyed(created, key).commit(Instant.now()), + context.createKeyedBundle(null, key, created).commit(Instant.now()), ImmutableList.of(), timerResult); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java index 66430b6a7f..50a7ed9180 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java @@ -67,6 +67,8 @@ */ @RunWith(JUnit4.class) public class ParDoMultiEvaluatorFactoryTest implements Serializable { + private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); + @Test public void testParDoMultiInMemoryTransformEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); @@ -78,26 +80,30 @@ public void testParDoMultiInMemoryTransformEvaluator() throws Exception { final TupleTag lengthTag = new TupleTag<>(); BoundMulti> pardo = - ParDo.of(new DoFn>() { - @Override - public void processElement(ProcessContext c) { - c.output(KV.of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }).withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag)); + ParDo.of( + new DoFn>() { + @Override + public void processElement(ProcessContext c) { + c.output(KV.of(c.element(), c.element().length())); + c.sideOutput(elementTag, c.element()); + c.sideOutput(lengthTag, c.element().length()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag)); PCollectionTuple outputTuple = input.apply(pardo); - CommittedBundle inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); PCollection> mainOutput = outputTuple.get(mainOutputTag); PCollection elementOutput = outputTuple.get(elementTag); PCollection lengthOutput = outputTuple.get(lengthTag); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); - UncommittedBundle elementOutputBundle = InProcessBundle.unkeyed(elementOutput); - UncommittedBundle lengthOutputBundle = InProcessBundle.unkeyed(lengthOutput); + UncommittedBundle> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle elementOutputBundle = bundleFactory.createRootBundle(elementOutput); + UncommittedBundle lengthOutputBundle = bundleFactory.createRootBundle(lengthOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); when(evaluationContext.createBundle(inputBundle, elementOutput)) @@ -112,8 +118,9 @@ public void processElement(ProcessContext c) { when(evaluationContext.createCounterSet()).thenReturn(counters); com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator evaluator = - new ParDoMultiEvaluatorFactory().forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -161,24 +168,28 @@ public void testParDoMultiUndeclaredSideOutput() throws Exception { final TupleTag lengthTag = new TupleTag<>(); BoundMulti> pardo = - ParDo.of(new DoFn>() { - @Override - public void processElement(ProcessContext c) { - c.output(KV.of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }).withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + ParDo.of( + new DoFn>() { + @Override + public void processElement(ProcessContext c) { + c.output(KV.of(c.element(), c.element().length())); + c.sideOutput(elementTag, c.element()); + c.sideOutput(lengthTag, c.element().length()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); PCollectionTuple outputTuple = input.apply(pardo); - CommittedBundle inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); PCollection> mainOutput = outputTuple.get(mainOutputTag); PCollection elementOutput = outputTuple.get(elementTag); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); - UncommittedBundle elementOutputBundle = InProcessBundle.unkeyed(elementOutput); + UncommittedBundle> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle elementOutputBundle = bundleFactory.createRootBundle(elementOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); when(evaluationContext.createBundle(inputBundle, elementOutput)) @@ -192,8 +203,9 @@ public void processElement(ProcessContext c) { when(evaluationContext.createCounterSet()).thenReturn(counters); com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator evaluator = - new ParDoMultiEvaluatorFactory().forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -204,8 +216,7 @@ public void processElement(ProcessContext c) { InProcessTransformResult result = evaluator.finishBundle(); assertThat( result.getOutputBundles(), - Matchers.>containsInAnyOrder( - mainOutputBundle, elementOutputBundle)); + Matchers.>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); assertThat(result.getCounters(), equalTo(counters)); @@ -259,14 +270,16 @@ public void processElement(ProcessContext c) { .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); PCollectionTuple outputTuple = input.apply(pardo); - CommittedBundle inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); PCollection> mainOutput = outputTuple.get(mainOutputTag); PCollection elementOutput = outputTuple.get(elementTag); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); - UncommittedBundle elementOutputBundle = InProcessBundle.unkeyed(elementOutput); + UncommittedBundle> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle elementOutputBundle = bundleFactory.createRootBundle(elementOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); when(evaluationContext.createBundle(inputBundle, elementOutput)) @@ -280,8 +293,9 @@ public void processElement(ProcessContext c) { when(evaluationContext.createCounterSet()).thenReturn(counters); com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator evaluator = - new ParDoMultiEvaluatorFactory().forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -366,14 +380,16 @@ public void processElement(ProcessContext c) { .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); PCollectionTuple outputTuple = input.apply(pardo); - CommittedBundle inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); PCollection> mainOutput = outputTuple.get(mainOutputTag); PCollection elementOutput = outputTuple.get(elementTag); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); - UncommittedBundle elementOutputBundle = InProcessBundle.unkeyed(elementOutput); + UncommittedBundle> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle elementOutputBundle = bundleFactory.createRootBundle(elementOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); when(evaluationContext.createBundle(inputBundle, elementOutput)) @@ -387,8 +403,9 @@ public void processElement(ProcessContext c) { when(evaluationContext.createCounterSet()).thenReturn(counters); com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator evaluator = - new ParDoMultiEvaluatorFactory().forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java index 3b928b9077..8cec0ba609 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java @@ -64,21 +64,27 @@ */ @RunWith(JUnit4.class) public class ParDoSingleEvaluatorFactoryTest implements Serializable { + private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); + @Test public void testParDoInMemoryTransformEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - PCollection collection = input.apply(ParDo.of(new DoFn() { - @Override public void processElement(ProcessContext c) { - c.output(c.element().length()); - } - })); - CommittedBundle inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + PCollection collection = + input.apply( + ParDo.of( + new DoFn() { + @Override + public void processElement(ProcessContext c) { + c.output(c.element().length()); + } + })); + CommittedBundle inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle outputBundle = - InProcessBundle.unkeyed(collection); + UncommittedBundle outputBundle = bundleFactory.createRootBundle(collection); when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); InProcessExecutionContext executionContext = new InProcessExecutionContext(null, null, null, null); @@ -88,8 +94,9 @@ public void testParDoInMemoryTransformEvaluator() throws Exception { when(evaluationContext.createCounterSet()).thenReturn(counters); com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator evaluator = - new ParDoSingleEvaluatorFactory().forApplication( - collection.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoSingleEvaluatorFactory() + .forApplication( + collection.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -116,16 +123,20 @@ public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception { PCollection input = p.apply(Create.of("foo", "bara", "bazam")); final TupleTag sideOutputTag = new TupleTag() {}; - PCollection collection = input.apply(ParDo.of(new DoFn() { - @Override public void processElement(ProcessContext c) { - c.sideOutput(sideOutputTag, c.element().length()); - } - })); - CommittedBundle inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + PCollection collection = + input.apply( + ParDo.of( + new DoFn() { + @Override + public void processElement(ProcessContext c) { + c.sideOutput(sideOutputTag, c.element().length()); + } + })); + CommittedBundle inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle outputBundle = - InProcessBundle.unkeyed(collection); + UncommittedBundle outputBundle = bundleFactory.createRootBundle(collection); when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); InProcessExecutionContext executionContext = new InProcessExecutionContext(null, null, null, null); @@ -135,8 +146,9 @@ public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception { when(evaluationContext.createCounterSet()).thenReturn(counters); TransformEvaluator evaluator = - new ParDoSingleEvaluatorFactory().forApplication( - collection.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoSingleEvaluatorFactory() + .forApplication( + collection.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -181,10 +193,12 @@ public void processElement(ProcessContext c) { }); PCollection> mainOutput = input.apply(pardo); - CommittedBundle inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); + UncommittedBundle> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); @@ -244,42 +258,44 @@ public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception ParDo.Bound> pardo = ParDo.of( - new DoFn>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals().stateInternals(); - c.windowingInternals() - .timerInternals() - .setTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME)); - c.windowingInternals() - .timerInternals() - .deleteTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0), - new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); - } - }); + new DoFn>() { + @Override + public void processElement(ProcessContext c) { + c.windowingInternals().stateInternals(); + c.windowingInternals() + .timerInternals() + .setTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME)); + c.windowingInternals() + .timerInternals() + .deleteTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0), + new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + } + }); PCollection> mainOutput = input.apply(pardo); - CommittedBundle inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); + UncommittedBundle> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); @@ -301,10 +317,7 @@ public void processElement(ProcessContext c) { assertThat( result.getTimerUpdate(), equalTo( - TimerUpdate.builder("myKey") - .setTimer(addedTimer) - .deletedTimer(deletedTimer) - .build())); + TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build())); } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java index 7741263c77..b14b34bb24 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java @@ -73,6 +73,7 @@ public class TransformExecutorTest { private RegisteringCompletionCallback completionCallback; private TransformExecutorService transformEvaluationState; + private BundleFactory bundleFactory; @Mock private InProcessEvaluationContext evaluationContext; @Mock private TransformEvaluatorRegistry registry; private Map, Boolean> scheduled; @@ -81,6 +82,8 @@ public class TransformExecutorTest { public void setup() { MockitoAnnotations.initMocks(this); + bundleFactory = InProcessBundleFactory.create(); + scheduled = new HashMap<>(); transformEvaluationState = TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled); @@ -155,7 +158,7 @@ public InProcessTransformResult finishBundle() throws Exception { WindowedValue spam = WindowedValue.valueInGlobalWindow("spam"); WindowedValue third = WindowedValue.valueInGlobalWindow("third"); CommittedBundle inputBundle = - InProcessBundle.unkeyed(created).add(foo).add(spam).add(third).commit(Instant.now()); + bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now()); when( registry.forApplication( downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) @@ -201,7 +204,7 @@ public InProcessTransformResult finishBundle() throws Exception { WindowedValue foo = WindowedValue.valueInGlobalWindow("foo"); CommittedBundle inputBundle = - InProcessBundle.unkeyed(created).add(foo).commit(Instant.now()); + bundleFactory.createRootBundle(created).add(foo).commit(Instant.now()); when( registry.forApplication( downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) @@ -239,7 +242,8 @@ public InProcessTransformResult finishBundle() throws Exception { } }; - CommittedBundle inputBundle = InProcessBundle.unkeyed(created).commit(Instant.now()); + CommittedBundle inputBundle = + bundleFactory.createRootBundle(created).commit(Instant.now()); when( registry.forApplication( downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java index 20a7d60bce..ab0b939597 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java @@ -70,6 +70,8 @@ public class UnboundedReadEvaluatorFactoryTest { private InProcessEvaluationContext context; private UncommittedBundle output; + private BundleFactory bundleFactory = InProcessBundleFactory.create(); + @Before public void setup() { UnboundedSource source = @@ -79,7 +81,7 @@ public void setup() { factory = new UnboundedReadEvaluatorFactory(); context = mock(InProcessEvaluationContext.class); - output = InProcessBundle.unkeyed(longs); + output = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(output); } @@ -116,7 +118,7 @@ public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() t tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), tgw(0L))); - UncommittedBundle secondOutput = InProcessBundle.unkeyed(longs); + UncommittedBundle secondOutput = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(secondOutput); TransformEvaluator secondEvaluator = factory.forApplication(longs.getProducingTransformInternal(), null, context); @@ -139,6 +141,7 @@ public void boundedSourceEvaluatorClosesReader() throws Exception { PCollection pcollection = p.apply(Read.from(source)); AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); + UncommittedBundle output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator evaluator = factory.forApplication(sourceTransform, null, context); @@ -156,6 +159,7 @@ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { PCollection pcollection = p.apply(Read.from(source)); AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); + UncommittedBundle output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator evaluator = factory.forApplication(sourceTransform, null, context); @@ -173,7 +177,7 @@ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { */ @Test public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception { - UncommittedBundle secondOutput = InProcessBundle.unkeyed(longs); + UncommittedBundle secondOutput = bundleFactory.createRootBundle(longs); TransformEvaluator evaluator = factory.forApplication(longs.getProducingTransformInternal(), null, context); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java index 2f5bdde6ad..26cafa74cc 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java @@ -48,6 +48,8 @@ */ @RunWith(JUnit4.class) public class ViewEvaluatorFactoryTest { + private BundleFactory bundleFactory = InProcessBundleFactory.create(); + @Test public void testInMemoryEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); @@ -68,7 +70,8 @@ public void testInMemoryEvaluator() throws Exception { TestViewWriter> viewWriter = new TestViewWriter<>(); when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter); - CommittedBundle inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); TransformEvaluator> evaluator = new ViewEvaluatorFactory() .forApplication(view.getProducingTransformInternal(), inputBundle, context); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java new file mode 100644 index 0000000000..40b1d5a69e --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.IllegalMutationException; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ImmutabilityCheckingBundleFactory}. + */ +@RunWith(JUnit4.class) +public class ImmutabilityCheckingBundleFactoryTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + private ImmutabilityCheckingBundleFactory factory; + private PCollection created; + private PCollection transformed; + + @Before + public void setup() { + TestPipeline p = TestPipeline.create(); + created = p.apply(Create.of().withCoder(ByteArrayCoder.of())); + transformed = created.apply(ParDo.of(new IdentityDoFn())); + factory = ImmutabilityCheckingBundleFactory.create(InProcessBundleFactory.create()); + } + + @Test + public void noMutationRootBundleSucceeds() { + UncommittedBundle root = factory.createRootBundle(created); + byte[] array = new byte[] {0, 1, 2}; + root.add(WindowedValue.valueInGlobalWindow(array)); + CommittedBundle committed = root.commit(Instant.now()); + + assertThat( + committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array))); + } + + @Test + public void noMutationKeyedBundleSucceeds() { + CommittedBundle root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle keyed = factory.createKeyedBundle(root, "mykey", transformed); + + WindowedValue windowedArray = + WindowedValue.of( + new byte[] {4, 8, 12}, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + keyed.add(windowedArray); + + CommittedBundle committed = keyed.commit(Instant.now()); + assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); + } + + @Test + public void noMutationCreateBundleSucceeds() { + CommittedBundle root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle intermediate = factory.createBundle(root, transformed); + + WindowedValue windowedArray = + WindowedValue.of( + new byte[] {4, 8, 12}, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + intermediate.add(windowedArray); + + CommittedBundle committed = intermediate.commit(Instant.now()); + assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); + } + + @Test + public void mutationBeforeAddRootBundleSucceeds() { + UncommittedBundle root = factory.createRootBundle(created); + byte[] array = new byte[] {0, 1, 2}; + array[1] = 2; + root.add(WindowedValue.valueInGlobalWindow(array)); + CommittedBundle committed = root.commit(Instant.now()); + + assertThat( + committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array))); + } + + @Test + public void mutationBeforeAddKeyedBundleSucceeds() { + CommittedBundle root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle keyed = factory.createKeyedBundle(root, "mykey", transformed); + + byte[] array = new byte[] {4, 8, 12}; + array[0] = Byte.MAX_VALUE; + WindowedValue windowedArray = + WindowedValue.of( + array, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + keyed.add(windowedArray); + + CommittedBundle committed = keyed.commit(Instant.now()); + assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); + } + + @Test + public void mutationBeforeAddCreateBundleSucceeds() { + CommittedBundle root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle intermediate = factory.createBundle(root, transformed); + + byte[] array = new byte[] {4, 8, 12}; + WindowedValue windowedArray = + WindowedValue.of( + array, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + array[2] = -3; + intermediate.add(windowedArray); + + CommittedBundle committed = intermediate.commit(Instant.now()); + assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); + } + + @Test + public void mutationAfterAddRootBundleThrows() { + UncommittedBundle root = factory.createRootBundle(created); + byte[] array = new byte[] {0, 1, 2}; + root.add(WindowedValue.valueInGlobalWindow(array)); + + array[1] = 2; + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expectMessage("Values must not be mutated in any way after being output"); + CommittedBundle committed = root.commit(Instant.now()); + } + + @Test + public void mutationAfterAddKeyedBundleThrows() { + CommittedBundle root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle keyed = factory.createKeyedBundle(root, "mykey", transformed); + + byte[] array = new byte[] {4, 8, 12}; + WindowedValue windowedArray = + WindowedValue.of( + array, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + keyed.add(windowedArray); + + array[0] = Byte.MAX_VALUE; + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expectMessage("Values must not be mutated in any way after being output"); + CommittedBundle committed = keyed.commit(Instant.now()); + } + + @Test + public void mutationAfterAddCreateBundleThrows() { + CommittedBundle root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle intermediate = factory.createBundle(root, transformed); + + byte[] array = new byte[] {4, 8, 12}; + WindowedValue windowedArray = + WindowedValue.of( + array, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + intermediate.add(windowedArray); + + array[2] = -3; + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expectMessage("Values must not be mutated in any way after being output"); + CommittedBundle committed = intermediate.commit(Instant.now()); + } + + private static class IdentityDoFn extends DoFn { + @Override + public void processElement(DoFn.ProcessContext c) throws Exception { + c.output(c.element()); + } + } +} diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java new file mode 100644 index 0000000000..060d43cfe7 --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * Tests for {@link InProcessBundleFactory}. + */ +@RunWith(JUnit4.class) +public class InProcessBundleFactoryTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private InProcessBundleFactory bundleFactory = InProcessBundleFactory.create(); + + private PCollection created; + private PCollection> downstream; + + @Before + public void setup() { + TestPipeline p = TestPipeline.create(); + created = p.apply(Create.of(1, 2, 3)); + downstream = created.apply(WithKeys.of("foo")); + } + + @Test + public void createRootBundleShouldCreateWithNullKey() { + PCollection pcollection = TestPipeline.create().apply(Create.of(1)); + + UncommittedBundle inFlightBundle = bundleFactory.createRootBundle(pcollection); + + CommittedBundle bundle = inFlightBundle.commit(Instant.now()); + + assertThat(bundle.isKeyed(), is(false)); + assertThat(bundle.getKey(), nullValue()); + } + + private void createKeyedBundle(Object key) { + PCollection pcollection = TestPipeline.create().apply(Create.of(1)); + + UncommittedBundle inFlightBundle = + bundleFactory.createKeyedBundle(null, key, pcollection); + + CommittedBundle bundle = inFlightBundle.commit(Instant.now()); + assertThat(bundle.isKeyed(), is(true)); + assertThat(bundle.getKey(), equalTo(key)); + } + + @Test + public void keyedWithNullKeyShouldCreateKeyedBundle() { + createKeyedBundle(null); + } + + @Test + public void keyedWithKeyShouldCreateKeyedBundle() { + createKeyedBundle(new Object()); + } + + private void afterCommitGetElementsShouldHaveAddedElements(Iterable> elems) { + PCollection pcollection = TestPipeline.create().apply(Create.of()); + + UncommittedBundle bundle = bundleFactory.createRootBundle(pcollection); + Collection>> expectations = new ArrayList<>(); + for (WindowedValue elem : elems) { + bundle.add(elem); + expectations.add(equalTo(elem)); + } + Matcher>> containsMatcher = + Matchers.>containsInAnyOrder(expectations); + assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher); + } + + @Test + public void getElementsBeforeAddShouldReturnEmptyIterable() { + afterCommitGetElementsShouldHaveAddedElements(Collections.>emptyList()); + } + + @Test + public void getElementsAfterAddShouldReturnAddedElements() { + WindowedValue firstValue = WindowedValue.valueInGlobalWindow(1); + WindowedValue secondValue = + WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L)); + + afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue)); + } + + @Test + public void addAfterCommitShouldThrowException() { + PCollection pcollection = TestPipeline.create().apply(Create.of()); + + UncommittedBundle bundle = bundleFactory.createRootBundle(pcollection); + bundle.add(WindowedValue.valueInGlobalWindow(1)); + CommittedBundle firstCommit = bundle.commit(Instant.now()); + assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1))); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("3"); + thrown.expectMessage("committed"); + + bundle.add(WindowedValue.valueInGlobalWindow(3)); + } + + @Test + public void commitAfterCommitShouldThrowException() { + PCollection pcollection = TestPipeline.create().apply(Create.of()); + + UncommittedBundle bundle = bundleFactory.createRootBundle(pcollection); + bundle.add(WindowedValue.valueInGlobalWindow(1)); + CommittedBundle firstCommit = bundle.commit(Instant.now()); + assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1))); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("committed"); + + bundle.commit(Instant.now()); + } + + @Test + public void createBundleUnkeyedResultUnkeyed() { + CommittedBundle> newBundle = + bundleFactory + .createBundle(bundleFactory.createRootBundle(created).commit(Instant.now()), downstream) + .commit(Instant.now()); + assertThat(newBundle.isKeyed(), is(false)); + } + + @Test + public void createBundleKeyedResultPropagatesKey() { + CommittedBundle> newBundle = + bundleFactory + .createBundle( + bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()), + downstream) + .commit(Instant.now()); + assertThat(newBundle.isKeyed(), is(true)); + assertThat(newBundle.getKey(), Matchers.equalTo("foo")); + } + + @Test + public void createRootBundleUnkeyed() { + assertThat(bundleFactory.createRootBundle(created).commit(Instant.now()).isKeyed(), is(false)); + } + + @Test + public void createKeyedBundleKeyed() { + CommittedBundle> keyedBundle = + bundleFactory + .createKeyedBundle( + bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream) + .commit(Instant.now()); + assertThat(keyedBundle.isKeyed(), is(true)); + assertThat(keyedBundle.getKey(), Matchers.equalTo("foo")); + } +}