Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 18dd3cd

Browse files
committed
Merge pull request #160 from dhalperi/backport-52
Backport apache/beam#52
2 parents 972985c + 931eb73 commit 18dd3cd

File tree

4 files changed

+366
-40
lines changed

4 files changed

+366
-40
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.google.cloud.dataflow.sdk.runners.inprocess;
1717

18+
import com.google.cloud.dataflow.sdk.io.BoundedSource;
19+
import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
1820
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
1921
import com.google.cloud.dataflow.sdk.io.Source.Reader;
2022
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
@@ -78,8 +80,7 @@ private <OutputT> TransformEvaluator<?> getTransformEvaluator(
7880
@SuppressWarnings("unchecked")
7981
private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
8082
final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
81-
final InProcessEvaluationContext evaluationContext)
82-
throws IOException {
83+
final InProcessEvaluationContext evaluationContext) {
8384
// Key by the application and the context the evaluation is occurring in (which call to
8485
// Pipeline#run).
8586
EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
@@ -101,39 +102,51 @@ private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueu
101102
return evaluatorQueue;
102103
}
103104

105+
/**
106+
* A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
107+
* discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
108+
* creates the {@link BoundedReader} and consumes all available input.
109+
*
110+
* <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and
111+
* each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
112+
* may produce duplicate elements.
113+
*/
104114
private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
105115
private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
106116
private final InProcessEvaluationContext evaluationContext;
107-
private final Reader<OutputT> reader;
108117
private boolean contentsRemaining;
109118

110119
public BoundedReadEvaluator(
111120
AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
112-
InProcessEvaluationContext evaluationContext)
113-
throws IOException {
121+
InProcessEvaluationContext evaluationContext) {
114122
this.transform = transform;
115123
this.evaluationContext = evaluationContext;
116-
reader =
117-
transform.getTransform().getSource().createReader(evaluationContext.getPipelineOptions());
118-
contentsRemaining = reader.start();
119124
}
120125

121126
@Override
122127
public void processElement(WindowedValue<Object> element) {}
123128

124129
@Override
125130
public InProcessTransformResult finishBundle() throws IOException {
126-
UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
127-
while (contentsRemaining) {
128-
output.add(
129-
WindowedValue.timestampedValueInGlobalWindow(
130-
reader.getCurrent(), reader.getCurrentTimestamp()));
131-
contentsRemaining = reader.advance();
131+
try (final Reader<OutputT> reader =
132+
transform
133+
.getTransform()
134+
.getSource()
135+
.createReader(evaluationContext.getPipelineOptions());) {
136+
contentsRemaining = reader.start();
137+
UncommittedBundle<OutputT> output =
138+
evaluationContext.createRootBundle(transform.getOutput());
139+
while (contentsRemaining) {
140+
output.add(
141+
WindowedValue.timestampedValueInGlobalWindow(
142+
reader.getCurrent(), reader.getCurrentTimestamp()));
143+
contentsRemaining = reader.advance();
144+
}
145+
reader.close();
146+
return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
147+
.addOutput(output)
148+
.build();
132149
}
133-
return StepTransformResult
134-
.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
135-
.addOutput(output)
136-
.build();
137150
}
138151
}
139152
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,16 @@ private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQu
9999
return evaluatorQueue;
100100
}
101101

102+
/**
103+
* A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
104+
* discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
105+
* creates the {@link UnboundedReader} and consumes some currently available input.
106+
*
107+
* <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be
108+
* used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own
109+
* checkpoint, and constructs its reader from the current checkpoint in each call to
110+
* {@link #finishBundle()}.
111+
*/
102112
private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
103113
private static final int ARBITRARY_MAX_ELEMENTS = 10;
104114
private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
@@ -122,28 +132,29 @@ public void processElement(WindowedValue<Object> element) {}
122132
@Override
123133
public InProcessTransformResult finishBundle() throws IOException {
124134
UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
125-
UnboundedReader<OutputT> reader =
126-
createReader(
127-
transform.getTransform().getSource(), evaluationContext.getPipelineOptions());
128-
int numElements = 0;
129-
if (reader.start()) {
130-
do {
131-
output.add(
132-
WindowedValue.timestampedValueInGlobalWindow(
133-
reader.getCurrent(), reader.getCurrentTimestamp()));
134-
numElements++;
135-
} while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
135+
try (UnboundedReader<OutputT> reader =
136+
createReader(
137+
transform.getTransform().getSource(), evaluationContext.getPipelineOptions());) {
138+
int numElements = 0;
139+
if (reader.start()) {
140+
do {
141+
output.add(
142+
WindowedValue.timestampedValueInGlobalWindow(
143+
reader.getCurrent(), reader.getCurrentTimestamp()));
144+
numElements++;
145+
} while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
146+
}
147+
checkpointMark = reader.getCheckpointMark();
148+
checkpointMark.finalizeCheckpoint();
149+
// TODO: When exercising create initial splits, make this the minimum watermark across all
150+
// existing readers
151+
StepTransformResult result =
152+
StepTransformResult.withHold(transform, reader.getWatermark())
153+
.addOutput(output)
154+
.build();
155+
evaluatorQueue.offer(this);
156+
return result;
136157
}
137-
checkpointMark = reader.getCheckpointMark();
138-
checkpointMark.finalizeCheckpoint();
139-
// TODO: When exercising create initial splits, make this the minimum watermark across all
140-
// existing readers
141-
StepTransformResult result =
142-
StepTransformResult.withHold(transform, reader.getWatermark())
143-
.addOutput(output)
144-
.build();
145-
evaluatorQueue.offer(this);
146-
return result;
147158
}
148159

149160
private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,39 @@
1818
import static org.hamcrest.Matchers.containsInAnyOrder;
1919
import static org.hamcrest.Matchers.emptyIterable;
2020
import static org.hamcrest.Matchers.equalTo;
21+
import static org.hamcrest.Matchers.is;
2122
import static org.junit.Assert.assertThat;
2223
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.when;
2425

26+
import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
27+
import com.google.cloud.dataflow.sdk.coders.Coder;
2528
import com.google.cloud.dataflow.sdk.io.BoundedSource;
29+
import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
2630
import com.google.cloud.dataflow.sdk.io.CountingSource;
2731
import com.google.cloud.dataflow.sdk.io.Read;
2832
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
33+
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
34+
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
2935
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
3036
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
37+
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
3138
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
3239
import com.google.cloud.dataflow.sdk.util.WindowedValue;
3340
import com.google.cloud.dataflow.sdk.values.PCollection;
41+
import com.google.common.collect.ImmutableList;
3442

43+
import org.joda.time.Instant;
3544
import org.junit.Before;
3645
import org.junit.Test;
3746
import org.junit.runner.RunWith;
3847
import org.junit.runners.JUnit4;
48+
import org.mockito.Mock;
49+
50+
import java.io.IOException;
51+
import java.util.Arrays;
52+
import java.util.List;
53+
import java.util.NoSuchElementException;
3954

4055
/**
4156
* Tests for {@link BoundedReadEvaluatorFactory}.
@@ -45,7 +60,7 @@ public class BoundedReadEvaluatorFactoryTest {
4560
private BoundedSource<Long> source;
4661
private PCollection<Long> longs;
4762
private TransformEvaluatorFactory factory;
48-
private InProcessEvaluationContext context;
63+
@Mock private InProcessEvaluationContext context;
4964

5065
@Before
5166
public void setup() {
@@ -146,6 +161,125 @@ public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception {
146161
gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
147162
}
148163

164+
@Test
165+
public void boundedSourceEvaluatorClosesReader() throws Exception {
166+
TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
167+
168+
TestPipeline p = TestPipeline.create();
169+
PCollection<Long> pcollection = p.apply(Read.from(source));
170+
AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
171+
172+
UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
173+
when(context.createRootBundle(pcollection)).thenReturn(output);
174+
175+
TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
176+
evaluator.finishBundle();
177+
CommittedBundle<Long> committed = output.commit(Instant.now());
178+
assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
179+
assertThat(TestSource.readerClosed, is(true));
180+
}
181+
182+
@Test
183+
public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
184+
TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of());
185+
186+
TestPipeline p = TestPipeline.create();
187+
PCollection<Long> pcollection = p.apply(Read.from(source));
188+
AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
189+
190+
UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
191+
when(context.createRootBundle(pcollection)).thenReturn(output);
192+
193+
TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
194+
evaluator.finishBundle();
195+
CommittedBundle<Long> committed = output.commit(Instant.now());
196+
assertThat(committed.getElements(), emptyIterable());
197+
assertThat(TestSource.readerClosed, is(true));
198+
}
199+
200+
private static class TestSource<T> extends BoundedSource<T> {
201+
private static boolean readerClosed;
202+
private final Coder<T> coder;
203+
private final T[] elems;
204+
205+
public TestSource(Coder<T> coder, T... elems) {
206+
this.elems = elems;
207+
this.coder = coder;
208+
readerClosed = false;
209+
}
210+
211+
@Override
212+
public List<? extends BoundedSource<T>> splitIntoBundles(
213+
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
214+
return ImmutableList.of(this);
215+
}
216+
217+
@Override
218+
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
219+
return 0;
220+
}
221+
222+
@Override
223+
public boolean producesSortedKeys(PipelineOptions options) throws Exception {
224+
return false;
225+
}
226+
227+
@Override
228+
public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
229+
return new TestReader<>(this, elems);
230+
}
231+
232+
@Override
233+
public void validate() {
234+
}
235+
236+
@Override
237+
public Coder<T> getDefaultOutputCoder() {
238+
return coder;
239+
}
240+
}
241+
242+
private static class TestReader<T> extends BoundedReader<T> {
243+
private final BoundedSource<T> source;
244+
private final List<T> elems;
245+
private int index;
246+
247+
public TestReader(BoundedSource<T> source, T... elems) {
248+
this.source = source;
249+
this.elems = Arrays.asList(elems);
250+
this.index = -1;
251+
}
252+
253+
@Override
254+
public BoundedSource<T> getCurrentSource() {
255+
return source;
256+
}
257+
258+
@Override
259+
public boolean start() throws IOException {
260+
return advance();
261+
}
262+
263+
@Override
264+
public boolean advance() throws IOException {
265+
if (elems.size() > index + 1) {
266+
index++;
267+
return true;
268+
}
269+
return false;
270+
}
271+
272+
@Override
273+
public T getCurrent() throws NoSuchElementException {
274+
return elems.get(index);
275+
}
276+
277+
@Override
278+
public void close() throws IOException {
279+
TestSource.readerClosed = true;
280+
}
281+
}
282+
149283
private static WindowedValue<Long> gw(Long elem) {
150284
return WindowedValue.valueInGlobalWindow(elem);
151285
}

0 commit comments

Comments
 (0)