Skip to content

Commit 61853d1

Browse files
tgrohdavorbonaci
authored andcommitted
Refactor LateDataDroppingDoFnRunner
This moves the LateDataDroppingDoFnRunner into util, removes references to worker code from util, and genericizes the general GroupAlsoByWindowsDoFn. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=113405451
1 parent 8b4c34e commit 61853d1

20 files changed

+373
-187
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/KeyedWorkItems.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill.Timer;
2424
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
2525
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
26+
import com.google.cloud.dataflow.sdk.util.ComposedKeyedWorkItem;
27+
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
2628
import com.google.cloud.dataflow.sdk.util.PropertyNames;
2729
import com.google.cloud.dataflow.sdk.util.TimeDomain;
2830
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
@@ -79,7 +81,7 @@ public static <K, ElemT> KeyedWorkItem<K, ElemT> windmillWorkItem(
7981
*/
8082
public static <K, ElemT> KeyedWorkItem<K, ElemT> elementsWorkItem(
8183
K key, Iterable<WindowedValue<ElemT>> elementsIterable) {
82-
return new ComposedKeyedWorkItem<>(key, Collections.<TimerData>emptyList(), elementsIterable);
84+
return ComposedKeyedWorkItem.create(key, Collections.<TimerData>emptyList(), elementsIterable);
8385
}
8486

8587
/**
@@ -90,7 +92,7 @@ public static <K, ElemT> KeyedWorkItem<K, ElemT> elementsWorkItem(
9092
*/
9193
public static <K, ElemT> KeyedWorkItem<K, ElemT> timersWorkItem(
9294
K key, Iterable<TimerData> timersIterable) {
93-
return new ComposedKeyedWorkItem<>(
95+
return ComposedKeyedWorkItem.create(
9496
key, timersIterable, Collections.<WindowedValue<ElemT>>emptyList());
9597
}
9698

@@ -103,8 +105,7 @@ public static <K, ElemT> KeyedWorkItem<K, ElemT> timersWorkItem(
103105
*/
104106
public static <K, ElemT> KeyedWorkItem<K, ElemT> workItem(
105107
K key, Iterable<TimerData> timersIterable, Iterable<WindowedValue<ElemT>> elementsIterable) {
106-
return new ComposedKeyedWorkItem<>(
107-
key, timersIterable, elementsIterable);
108+
return ComposedKeyedWorkItem.create(key, timersIterable, elementsIterable);
108109
}
109110

110111
private static class WindmillKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
@@ -217,33 +218,6 @@ public int hashCode() {
217218
}
218219
}
219220

220-
private static class ComposedKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
221-
private final K key;
222-
private final Iterable<TimerData> timersIterable;
223-
private final Iterable<WindowedValue<ElemT>> elementsIterable;
224-
225-
ComposedKeyedWorkItem(K key, Iterable<TimerData> timersIterable,
226-
Iterable<WindowedValue<ElemT>> elementsIterable) {
227-
this.key = key;
228-
this.timersIterable = timersIterable;
229-
this.elementsIterable = elementsIterable;
230-
}
231-
@Override
232-
public K key() {
233-
return key;
234-
}
235-
236-
@Override
237-
public Iterable<TimerData> timersIterable() {
238-
return timersIterable;
239-
}
240-
241-
@Override
242-
public Iterable<WindowedValue<ElemT>> elementsIterable() {
243-
return elementsIterable;
244-
}
245-
}
246-
247221
/**
248222
* Coder that forwards {@code ByteSizeObserver} calls to an underlying element coder.
249223
* {@code TimerOrElement} objects never need to be encoded, so this class does not

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ParDoFnBase.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.cloud.dataflow.sdk.transforms.DoFn;
2222
import com.google.cloud.dataflow.sdk.util.DoFnInfo;
2323
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
24-
import com.google.cloud.dataflow.sdk.util.DoFnRunner.RequiresLateDataDropping;
2524
import com.google.cloud.dataflow.sdk.util.DoFnRunners;
2625
import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
2726
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
@@ -179,21 +178,8 @@ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
179178
}
180179
};
181180

182-
if (doFnInfo.getDoFn() instanceof StreamingGroupAlsoByWindowsDoFn
183-
&& doFnInfo.getDoFn() instanceof RequiresLateDataDropping) {
184-
@SuppressWarnings({"unchecked", "rawtypes"})
185-
DoFnRunner<Object, Object> objectFnRunner = DoFnRunners.lateDataDroppingRunner(
186-
options,
187-
(DoFnInfo) doFnInfo,
188-
sideInputReader,
189-
outputManager,
190-
(TupleTag) mainOutputTag,
191-
sideOutputTags,
192-
stepContext,
193-
addCounterMutator);
194-
fnRunner = objectFnRunner;
195-
} else if (hasStreamingSideInput) {
196-
fnRunner = DoFnRunners.streamingSideInputRunner(
181+
if (hasStreamingSideInput) {
182+
fnRunner = StreamingDoFnRunners.streamingSideInputRunner(
197183
options,
198184
doFnInfo,
199185
sideInputReader,
@@ -203,7 +189,7 @@ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
203189
stepContext,
204190
addCounterMutator);
205191
} else {
206-
fnRunner = DoFnRunners.simpleRunner(
192+
fnRunner = DoFnRunners.createDefault(
207193
options,
208194
doFnInfo.getDoFn(),
209195
sideInputReader,
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (C) 2015 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.dataflow.sdk.runners.worker;
17+
18+
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
19+
import com.google.cloud.dataflow.sdk.util.DoFnInfo;
20+
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
21+
import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
22+
import com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext;
23+
import com.google.cloud.dataflow.sdk.util.SideInputReader;
24+
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
25+
import com.google.cloud.dataflow.sdk.values.TupleTag;
26+
27+
import java.util.List;
28+
29+
/**
30+
* Utility methods for creating {@link DoFnRunner} instances used by streaming Dataflow.
31+
*/
32+
public final class StreamingDoFnRunners {
33+
private StreamingDoFnRunners() {
34+
// Do not instantiate
35+
}
36+
37+
/**
38+
* Returns an implementation of {@link DoFnRunner} that handles streaming side inputs.
39+
*
40+
* <p>It blocks and caches input elements if their side inputs are not ready.
41+
*/
42+
public static <InputT, OutputT> DoFnRunner<InputT, OutputT> streamingSideInputRunner(
43+
PipelineOptions options,
44+
DoFnInfo<InputT, OutputT> doFnInfo,
45+
SideInputReader sideInputReader,
46+
OutputManager outputManager,
47+
TupleTag<OutputT> mainOutputTag,
48+
List<TupleTag<?>> sideOutputTags,
49+
StepContext stepContext,
50+
CounterSet.AddCounterMutator addCounterMutator) {
51+
return new StreamingSideInputDoFnRunner<>(
52+
options,
53+
doFnInfo,
54+
sideInputReader,
55+
outputManager,
56+
mainOutputTag,
57+
sideOutputTags,
58+
stepContext,
59+
addCounterMutator,
60+
doFnInfo.getWindowingStrategy());
61+
}
62+
63+
64+
}
65+

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingGroupAlsoByWindowsDoFn.java

Lines changed: 11 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,13 @@
1717
package com.google.cloud.dataflow.sdk.runners.worker;
1818

1919
import com.google.cloud.dataflow.sdk.coders.Coder;
20-
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
2120
import com.google.cloud.dataflow.sdk.transforms.DoFn;
22-
import com.google.cloud.dataflow.sdk.transforms.Sum;
2321
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
2422
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
25-
import com.google.cloud.dataflow.sdk.util.DoFnRunner.RequiresLateDataDropping;
26-
import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn;
27-
import com.google.cloud.dataflow.sdk.util.ReduceFnRunner;
23+
import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
24+
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
2825
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
2926
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
30-
import com.google.cloud.dataflow.sdk.util.TimerInternals;
31-
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
3227
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
3328
import com.google.cloud.dataflow.sdk.values.KV;
3429
import com.google.common.base.Preconditions;
@@ -44,64 +39,28 @@
4439
@SystemDoFnInternal
4540
public abstract class StreamingGroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
4641
extends DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
47-
protected final Aggregator<Long, Long> droppedDueToClosedWindow = createAggregator(
48-
GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
49-
protected final Aggregator<Long, Long> droppedDueToLateness = createAggregator(
50-
GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
51-
5242
public static <K, InputT, AccumT, OutputT, W extends BoundedWindow>
53-
StreamingGroupAlsoByWindowsDoFn<K, InputT, OutputT, W> create(
43+
DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
5444
final WindowingStrategy<?, W> windowingStrategy,
5545
final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn,
5646
final Coder<K> keyCoder) {
5747
Preconditions.checkNotNull(combineFn);
58-
return new StreamingGABWViaWindowSetDoFn<>(windowingStrategy,
59-
SystemReduceFn.<K, InputT, AccumT, OutputT, W>combining(keyCoder, combineFn));
48+
DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> fn =
49+
GroupAlsoByWindowViaWindowSetDoFn.create(
50+
windowingStrategy,
51+
SystemReduceFn.<K, InputT, AccumT, OutputT, W>combining(keyCoder, combineFn));
52+
return fn;
6053
}
6154

6255
public static <K, V, W extends BoundedWindow>
63-
DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> createForIterable(
64-
final WindowingStrategy<?, W> windowingStrategy,
65-
final Coder<V> inputCoder) {
56+
DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> createForIterable(
57+
final WindowingStrategy<?, W> windowingStrategy, final Coder<V> inputCoder) {
6658
// If the windowing strategy indicates we're doing a reshuffle, use the special-path.
6759
if (StreamingGroupAlsoByWindowsReshuffleDoFn.isReshuffle(windowingStrategy)) {
6860
return new StreamingGroupAlsoByWindowsReshuffleDoFn<>();
6961
} else {
70-
return new StreamingGABWViaWindowSetDoFn<>(
62+
return GroupAlsoByWindowViaWindowSetDoFn.create(
7163
windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
7264
}
7365
}
74-
75-
private static class StreamingGABWViaWindowSetDoFn<K, InputT, OutputT, W extends BoundedWindow>
76-
extends StreamingGroupAlsoByWindowsDoFn<K, InputT, OutputT, W>
77-
implements RequiresLateDataDropping {
78-
private final WindowingStrategy<Object, W> windowingStrategy;
79-
private SystemReduceFn.Factory<K, InputT, OutputT, W> reduceFnFactory;
80-
81-
public StreamingGABWViaWindowSetDoFn(WindowingStrategy<?, W> windowingStrategy,
82-
SystemReduceFn.Factory<K, InputT, OutputT, W> reduceFnFactory) {
83-
@SuppressWarnings("unchecked")
84-
WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy;
85-
this.windowingStrategy = noWildcard;
86-
this.reduceFnFactory = reduceFnFactory;
87-
}
88-
89-
@Override
90-
public void processElement(ProcessContext c) throws Exception {
91-
KeyedWorkItem<K, InputT> element = c.element();
92-
93-
@SuppressWarnings("unchecked")
94-
K key = c.element().key();
95-
TimerInternals timerInternals = c.windowingInternals().timerInternals();
96-
ReduceFnRunner<K, InputT, OutputT, W> runner = new ReduceFnRunner<>(
97-
key, windowingStrategy, timerInternals, c.windowingInternals(),
98-
droppedDueToClosedWindow, reduceFnFactory.create(key));
99-
100-
for (TimerData timer : element.timersIterable()) {
101-
runner.onTimer(timer);
102-
}
103-
runner.processElements(element.elementsIterable());
104-
runner.persist();
105-
}
106-
}
10766
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingGroupAlsoByWindowsReshuffleDoFn.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.dataflow.sdk.runners.worker;
1717

1818
import com.google.cloud.dataflow.sdk.transforms.DoFn;
19+
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
1920
import com.google.cloud.dataflow.sdk.util.ReshuffleTrigger;
2021
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
2122
import com.google.cloud.dataflow.sdk.util.WindowedValue;

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/WindowingWindmillReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
2525
import com.google.cloud.dataflow.sdk.util.CloudObject;
2626
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
27+
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
2728
import com.google.cloud.dataflow.sdk.util.WindowedValue;
2829
import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
2930
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
@@ -99,7 +100,7 @@ public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throw
99100
final WorkItem workItem = context.getWork();
100101
final WindowedValue<KeyedWorkItem<K, T>> value =
101102
WindowedValue.valueInEmptyWindows(
102-
KeyedWorkItems.windmillWorkItem(
103+
KeyedWorkItems.<K, T>windmillWorkItem(
103104
key, workItem, windowCoder, windowsCoder, kvCoder.getValueCoder()));
104105

105106
return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {

sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnTester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ void initializeState() {
481481
runnerSideInputs = runnerSideInputs.and(entry.getKey().getTagInternal(), entry.getValue());
482482
}
483483
outputManager = new DoFnRunnerBase.ListOutputManager();
484-
fnRunner = DoFnRunners.simpleRunner(
484+
fnRunner = DoFnRunners.createDefault(
485485
options,
486486
fn,
487487
DirectSideInputReader.of(runnerSideInputs),

sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/ParDo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1144,7 +1144,7 @@ private static <InputT, OutputT, ActualInputT extends InputT> void evaluateHelpe
11441144
outputs);
11451145

11461146
DoFnRunner<InputT, OutputT> fnRunner =
1147-
DoFnRunners.simpleRunner(
1147+
DoFnRunners.createDefault(
11481148
context.getPipelineOptions(),
11491149
fn,
11501150
sideInputReader,
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (C) 2015 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.dataflow.sdk.util;
17+
18+
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
19+
20+
/**
21+
* A {@link KeyedWorkItem} composed of an underlying key, {@link TimerData} iterable, and element
22+
* iterable.
23+
*/
24+
public class ComposedKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
25+
26+
private final K key;
27+
private final Iterable<TimerData> timers;
28+
private final Iterable<WindowedValue<ElemT>> elements;
29+
30+
public static <K, ElemT> ComposedKeyedWorkItem<K, ElemT> create(
31+
K key, Iterable<TimerData> timers, Iterable<WindowedValue<ElemT>> elements) {
32+
return new ComposedKeyedWorkItem<K, ElemT>(key, timers, elements);
33+
}
34+
35+
private ComposedKeyedWorkItem(
36+
K key, Iterable<TimerData> timers, Iterable<WindowedValue<ElemT>> elements) {
37+
this.key = key;
38+
this.timers = timers;
39+
this.elements = elements;
40+
}
41+
42+
@Override
43+
public K key() {
44+
return key;
45+
}
46+
47+
@Override
48+
public Iterable<TimerData> timersIterable() {
49+
return timers;
50+
}
51+
52+
@Override
53+
public Iterable<WindowedValue<ElemT>> elementsIterable() {
54+
return elements;
55+
}
56+
}

0 commit comments

Comments
 (0)