Skip to content

Commit ec5dfe8

Browse files
peihedavorbonaci
authored andcommitted
Create StreamingGroupAlsoByWindowsDoFnRunner
Move the input elements lateness check to DoFnRunner. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=112982861
1 parent 4ed7f13 commit ec5dfe8

27 files changed

+1459
-1000
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/GroupAlsoByWindowsParDoFn.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,15 @@ class GroupAlsoByWindowsParDoFn extends ParDoFnBase {
6565
static GroupAlsoByWindowsParDoFn of(
6666
PipelineOptions options,
6767
DoFn<?, ?> groupAlsoByWindowsDoFn,
68+
WindowingStrategy<?, ?> windowingStrategy,
6869
String stepName,
6970
String transformName,
70-
DataflowExecutionContext executionContext,
71+
DataflowExecutionContext<?> executionContext,
7172
CounterSet.AddCounterMutator addCounterMutator,
7273
StateSampler stateSampler)
7374
throws Exception {
74-
return new GroupAlsoByWindowsParDoFn(options, groupAlsoByWindowsDoFn, stepName, transformName,
75-
executionContext, addCounterMutator, stateSampler);
75+
return new GroupAlsoByWindowsParDoFn(options, groupAlsoByWindowsDoFn, windowingStrategy,
76+
stepName, transformName, executionContext, addCounterMutator, stateSampler);
7677
}
7778

7879
/**
@@ -160,6 +161,7 @@ public ParDoFn create(
160161
return GroupAlsoByWindowsParDoFn.of(
161162
options,
162163
groupAlsoByWindowsDoFn,
164+
windowingStrategy,
163165
stepName,
164166
transformName,
165167
executionContext,
@@ -170,7 +172,7 @@ public ParDoFn create(
170172

171173
@Override
172174
protected DoFnInfo<?, ?> getDoFnInfo() {
173-
return new DoFnInfo<>(groupAlsoByWindowsDoFn, null);
175+
return new DoFnInfo<>(groupAlsoByWindowsDoFn, windowingStrategy);
174176
}
175177

176178
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -262,13 +264,15 @@ public Coder<List<AccumT>> getAccumulatorCoder(CoderRegistry registry, Coder<K>
262264
}
263265

264266
private final DoFn<?, ?> groupAlsoByWindowsDoFn;
267+
private final WindowingStrategy<?, ?> windowingStrategy;
265268

266269
private GroupAlsoByWindowsParDoFn(
267270
PipelineOptions options,
268271
DoFn<?, ?> groupAlsoByWindowsDoFn,
272+
WindowingStrategy<?, ?> windowingStrategy,
269273
String stepName,
270274
String transformName,
271-
DataflowExecutionContext executionContext,
275+
DataflowExecutionContext<?> executionContext,
272276
CounterSet.AddCounterMutator addCounterMutator,
273277
StateSampler stateSampler) {
274278
super(
@@ -281,5 +285,6 @@ private GroupAlsoByWindowsParDoFn(
281285
addCounterMutator,
282286
stateSampler);
283287
this.groupAlsoByWindowsDoFn = groupAlsoByWindowsDoFn;
288+
this.windowingStrategy = windowingStrategy;
284289
}
285290
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/KeyedWorkItem.java

Lines changed: 13 additions & 208 deletions
Original file line numberDiff line numberDiff line change
@@ -13,228 +13,33 @@
1313
* License for the specific language governing permissions and limitations under
1414
* the License.
1515
*/
16-
1716
package com.google.cloud.dataflow.sdk.runners.worker;
1817

19-
import com.google.cloud.dataflow.sdk.coders.Coder;
20-
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
21-
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
22-
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill.InputMessageBundle;
23-
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill.Message;
24-
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill.Timer;
25-
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
26-
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
27-
import com.google.cloud.dataflow.sdk.util.PropertyNames;
28-
import com.google.cloud.dataflow.sdk.util.TimeDomain;
2918
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
3019
import com.google.cloud.dataflow.sdk.util.WindowedValue;
31-
import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
32-
import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
33-
import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
34-
import com.google.common.base.Function;
35-
import com.google.common.base.Predicate;
36-
import com.google.common.base.Predicates;
37-
import com.google.common.base.Throwables;
38-
import com.google.common.collect.FluentIterable;
39-
40-
import com.fasterxml.jackson.annotation.JsonCreator;
41-
import com.fasterxml.jackson.annotation.JsonProperty;
42-
43-
import org.joda.time.Instant;
44-
45-
import java.io.IOException;
46-
import java.io.InputStream;
47-
import java.io.OutputStream;
48-
import java.util.Arrays;
49-
import java.util.Collection;
50-
import java.util.List;
51-
import java.util.Objects;
52-
import java.util.concurrent.TimeUnit;
5320

5421
/**
55-
* Wrapper around a {@link Windmill.WorkItem} which contains all the timers and elements associated
56-
* with a specific work item.
22+
* Interface that contains all the timers and elements associated with a specific work item.
5723
*
5824
* <p>Used as the input type of {@link StreamingGroupAlsoByWindowsDoFn}.
5925
*
26+
* @param <K> the key type
6027
* @param <ElemT> the element type
6128
*/
62-
public class KeyedWorkItem<ElemT> {
63-
64-
private static final Predicate<Timer> IS_WATERMARK = new Predicate<Timer>() {
65-
@Override
66-
public boolean apply(Timer input) {
67-
return input.getType() == Timer.Type.WATERMARK;
68-
}
69-
};
70-
71-
public static <ElemT> KeyedWorkItem<ElemT> workItem(
72-
Object key,
73-
Windmill.WorkItem workItem,
74-
Coder<? extends BoundedWindow> windowCoder,
75-
Coder<Collection<? extends BoundedWindow>> windowsCoder,
76-
Coder<ElemT> valueCoder) {
77-
return new KeyedWorkItem<>(key, workItem, windowCoder, windowsCoder, valueCoder);
78-
}
79-
80-
public Object key() {
81-
return key;
82-
}
83-
84-
public Iterable<TimerData> timersIterable() {
85-
FluentIterable<Timer> allTimers = FluentIterable.from(workItem.getTimers().getTimersList());
86-
FluentIterable<Timer> eventTimers = allTimers.filter(IS_WATERMARK);
87-
FluentIterable<Timer> nonEventTimers = allTimers.filter(Predicates.not(IS_WATERMARK));
88-
return eventTimers.append(nonEventTimers).transform(new Function<Timer, TimerData>() {
89-
private TimeDomain getTimeDomain(Windmill.Timer.Type type) {
90-
switch (type) {
91-
case REALTIME:
92-
return TimeDomain.PROCESSING_TIME;
93-
case DEPENDENT_REALTIME:
94-
return TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
95-
case WATERMARK:
96-
return TimeDomain.EVENT_TIME;
97-
default:
98-
throw new IllegalArgumentException("Unsupported timer type " + type);
99-
}
100-
}
101-
102-
@Override
103-
public TimerData apply(Timer timer) {
104-
String tag = timer.getTag().toStringUtf8();
105-
String namespaceString = tag.substring(0, tag.indexOf('+'));
106-
StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);
107-
108-
Instant timestamp = new Instant(TimeUnit.MICROSECONDS.toMillis(timer.getTimestamp()));
109-
return TimerData.of(namespace, timestamp, getTimeDomain(timer.getType()));
110-
}
111-
});
112-
}
113-
114-
public Iterable<WindowedValue<ElemT>> elementsIterable() {
115-
return FluentIterable.from(workItem.getMessageBundlesList())
116-
.transformAndConcat(new Function<InputMessageBundle, Iterable<Message>>() {
117-
@Override
118-
public Iterable<Message> apply(InputMessageBundle input) {
119-
return input.getMessagesList();
120-
}
121-
})
122-
.transform(new Function<Message, WindowedValue<ElemT>>() {
123-
@Override
124-
public WindowedValue<ElemT> apply(Message message) {
125-
try {
126-
Instant timestamp = new Instant(
127-
TimeUnit.MICROSECONDS.toMillis(message.getTimestamp()));
128-
Collection<? extends BoundedWindow> windows =
129-
WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata());
130-
PaneInfo pane = WindmillSink.decodeMetadataPane(message.getMetadata());
131-
132-
InputStream inputStream = message.getData().newInput();
133-
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
134-
return WindowedValue.of(value, timestamp, windows, pane);
135-
} catch (IOException e) {
136-
throw Throwables.propagate(e);
137-
}
138-
}
139-
});
140-
}
141-
142-
@Override
143-
public boolean equals(Object other) {
144-
if (!(other instanceof KeyedWorkItem)) {
145-
return false;
146-
}
147-
148-
KeyedWorkItem<?> that = (KeyedWorkItem<?>) other;
149-
return Objects.equals(this.key, that.key)
150-
&& Objects.equals(this.workItem, that.workItem);
151-
}
152-
153-
@Override
154-
public int hashCode() {
155-
return Objects.hash(key, workItem);
156-
}
29+
public interface KeyedWorkItem<K, ElemT> {
15730

15831
/**
159-
* Coder that forwards {@code ByteSizeObserver} calls to an underlying element coder.
160-
* {@code TimerOrElement} objects never need to be encoded, so this class does not
161-
* support the {@code encode} and {@code decode} methods.
32+
* Returns the key.
16233
*/
163-
public static class KeyedWorkItemCoder<T> extends StandardCoder<KeyedWorkItem<T>> {
164-
final Coder<T> elemCoder;
165-
166-
/**
167-
* Creates a new {@code TimerOrElement.Coder} that wraps the given {@link Coder}.
168-
*/
169-
public static <T> KeyedWorkItemCoder<T> of(Coder<T> elemCoder) {
170-
return new KeyedWorkItemCoder<>(elemCoder);
171-
}
34+
public K key();
17235

173-
@JsonCreator
174-
public static KeyedWorkItemCoder<?> of(
175-
@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
176-
List<Object> components) {
177-
return of((Coder<?>) components.get(0));
178-
}
179-
180-
@Override
181-
public void encode(KeyedWorkItem<T> value, OutputStream outStream, Context context) {
182-
throw new UnsupportedOperationException();
183-
}
184-
185-
@Override
186-
public KeyedWorkItem<T> decode(InputStream inStream, Context context) {
187-
throw new UnsupportedOperationException();
188-
}
189-
190-
@Override
191-
public boolean isRegisterByteSizeObserverCheap(KeyedWorkItem<T> value, Context context) {
192-
return true;
193-
}
194-
195-
@Override
196-
public void registerByteSizeObserver(
197-
KeyedWorkItem<T> value, ElementByteSizeObserver observer, Context context)
198-
throws Exception {
199-
observer.update((long) value.workItem.getSerializedSize());
200-
}
201-
202-
@Override
203-
public void verifyDeterministic() throws NonDeterministicException {}
204-
205-
@Override
206-
public List<? extends Coder<?>> getCoderArguments() {
207-
return Arrays.asList(elemCoder);
208-
}
209-
210-
public Coder<T> getElementCoder() {
211-
return elemCoder;
212-
}
213-
214-
protected KeyedWorkItemCoder(Coder<T> elemCoder) {
215-
this.elemCoder = elemCoder;
216-
}
217-
}
218-
219-
//////////////////////////////////////////////////////////////////////////////
220-
221-
private final Windmill.WorkItem workItem;
222-
private final Object key;
223-
224-
private final transient Coder<? extends BoundedWindow> windowCoder;
225-
private final transient Coder<Collection<? extends BoundedWindow>> windowsCoder;
226-
private final transient Coder<ElemT> valueCoder;
36+
/**
37+
* Returns the timers iterable.
38+
*/
39+
public Iterable<TimerData> timersIterable();
22740

228-
KeyedWorkItem(
229-
Object key,
230-
Windmill.WorkItem workItem,
231-
Coder<? extends BoundedWindow> windowCoder,
232-
Coder<Collection<? extends BoundedWindow>> windowsCoder,
233-
Coder<ElemT> valueCoder) {
234-
this.key = key;
235-
this.workItem = workItem;
236-
this.windowCoder = windowCoder;
237-
this.windowsCoder = windowsCoder;
238-
this.valueCoder = valueCoder;
239-
}
41+
/**
42+
* Returns the elements iterable.
43+
*/
44+
public Iterable<WindowedValue<ElemT>> elementsIterable();
24045
}

0 commit comments

Comments
 (0)