Skip to content

Commit 61a2e27

Browse files
kennknowlesdavorbonaci
authored andcommitted
Expose event time and synchronized upstream processing time
Event time track the input event time watermark for a step. Synchronized processing time tracks the processing time which all upstream workers have reached and processed timers for. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=112803163
1 parent b5b8d95 commit 61a2e27

File tree

12 files changed

+216
-12
lines changed

12 files changed

+216
-12
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,10 @@ private void dispatchLoop() {
492492
final Instant inputDataWatermark =
493493
WindmillTimeUtils.windmillToHarnessInputWatermark(
494494
computationWork.getInputDataWatermark());
495+
@Nullable
496+
final Instant synchronizedProcessingTime =
497+
WindmillTimeUtils.windmillToHarnessInputWatermark(
498+
computationWork.getDependentRealtimeInputWatermark());
495499
ActiveWorkForComputation activeWork = activeWorkMap.get(computation);
496500
for (final Windmill.WorkItem workItem : computationWork.getWorkList()) {
497501
// May be null if output watermark not yet known.
@@ -504,7 +508,8 @@ private void dispatchLoop() {
504508
Work work = new Work(workItem.getWorkToken()) {
505509
@Override
506510
public void run() {
507-
process(computation, mapTask, inputDataWatermark, outputDataWatermark, workItem);
511+
process(computation, mapTask, inputDataWatermark, outputDataWatermark,
512+
synchronizedProcessingTime, workItem);
508513
}
509514
};
510515
if (activeWork.activateWork(workItem.getKey(), work)) {
@@ -528,6 +533,7 @@ public long getWorkToken() {
528533

529534
private void process(final String computation, final MapTask mapTask,
530535
@Nullable final Instant inputDataWatermark, @Nullable final Instant outputDataWatermark,
536+
@Nullable final Instant synchronizedProcessingTime,
531537
final Windmill.WorkItem work) {
532538
LOG.debug("Starting processing for {}:\n{}", computation, work);
533539

@@ -587,8 +593,8 @@ private void process(final String computation, final MapTask mapTask,
587593
WindmillStateReader stateReader = new WindmillStateReader(
588594
metricTrackingWindmillServer, computation, work.getKey(), work.getWorkToken());
589595
StateFetcher localStateFetcher = stateFetcher.byteTrackingView();
590-
context.start(work, inputDataWatermark, outputDataWatermark, stateReader, localStateFetcher,
591-
outputBuilder);
596+
context.start(work, inputDataWatermark, outputDataWatermark, synchronizedProcessingTime,
597+
stateReader, localStateFetcher, outputBuilder);
592598

593599
for (Long callbackId : context.getReadyCommitCallbackIds()) {
594600
final Runnable callback = commitCallbacks.remove(callbackId);
@@ -688,7 +694,8 @@ public void run() {
688694
workUnitExecutor.forceExecute(new Runnable() {
689695
@Override
690696
public void run() {
691-
process(computation, mapTask, inputDataWatermark, outputDataWatermark, work);
697+
process(computation, mapTask, inputDataWatermark, outputDataWatermark,
698+
synchronizedProcessingTime, work);
692699
}
693700
});
694701
} else {

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingModeExecutionContext.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class StreamingModeExecutionContext
7070
private Windmill.WorkItem work;
7171
@Nullable private Instant inputDataWatermark;
7272
@Nullable private Instant outputDataWatermark;
73+
@Nullable private Instant synchronizedProcessingTime;
7374
private WindmillStateReader stateReader;
7475
private StateFetcher stateFetcher;
7576
private Windmill.WorkItemCommitRequest.Builder outputBuilder;
@@ -89,27 +90,30 @@ public void start(
8990
Windmill.WorkItem work,
9091
@Nullable Instant inputDataWatermark,
9192
@Nullable Instant outputDataWatermark,
93+
@Nullable Instant synchronizedProcessingTime,
9294
WindmillStateReader stateReader,
9395
StateFetcher stateFetcher,
9496
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
9597
this.work = work;
9698
this.inputDataWatermark = inputDataWatermark;
9799
this.outputDataWatermark = outputDataWatermark;
100+
this.synchronizedProcessingTime = synchronizedProcessingTime;
98101
this.stateReader = stateReader;
99102
this.stateFetcher = stateFetcher;
100103
this.outputBuilder = outputBuilder;
101104
this.sideInputCache.clear();
102105

103106
for (ExecutionContext.StepContext stepContext : getAllStepContexts()) {
104-
((StepContext) stepContext).start(stateReader, inputDataWatermark, outputDataWatermark);
107+
((StepContext) stepContext)
108+
.start(stateReader, inputDataWatermark, outputDataWatermark, synchronizedProcessingTime);
105109
}
106110
}
107111

108112
@Override
109113
public StepContext createStepContext(
110114
String stepName, String transformName, StateSampler stateSampler) {
111115
StepContext context = new StepContext(stepName, transformName, stateSampler);
112-
context.start(stateReader, inputDataWatermark, outputDataWatermark);
116+
context.start(stateReader, inputDataWatermark, outputDataWatermark, synchronizedProcessingTime);
113117
return context;
114118
}
115119

@@ -297,11 +301,11 @@ private static class WindmillTimerInternals implements TimerInternals {
297301
private Map<TimerData, Boolean> timers = new HashMap<>();
298302
@Nullable private Instant inputDataWatermark;
299303
@Nullable private Instant outputDataWatermark;
304+
@Nullable private Instant synchronizedProcessingTime;
300305
private String stateFamily;
301306

302-
public WindmillTimerInternals(
303-
String stateFamily, @Nullable Instant inputDataWatermark,
304-
@Nullable Instant outputDataWatermark) {
307+
public WindmillTimerInternals(String stateFamily, @Nullable Instant inputDataWatermark,
308+
@Nullable Instant outputDataWatermark, @Nullable Instant synchronizedProcessingTime) {
305309
this.inputDataWatermark = inputDataWatermark;
306310
this.outputDataWatermark = outputDataWatermark;
307311
this.stateFamily = stateFamily;
@@ -322,6 +326,12 @@ public Instant currentProcessingTime() {
322326
return Instant.now();
323327
}
324328

329+
@Override
330+
@Nullable
331+
public Instant currentSynchronizedProcessingTime() {
332+
return synchronizedProcessingTime;
333+
}
334+
325335
/**
326336
* {@inheritDoc}
327337
*
@@ -417,12 +427,14 @@ public StateSampler.ScopedState get() {
417427
*/
418428
public void start(
419429
WindmillStateReader stateReader, @Nullable Instant inputDataWatermark,
420-
@Nullable Instant outputDataWatermark) {
430+
@Nullable Instant outputDataWatermark,
431+
@Nullable Instant synchronizedProcessingTime) {
421432
this.stateInternals = new WindmillStateInternals(stateFamily, stateReader,
422433
stateCache.forKey(getSerializedKey(), stateFamily, getWork().getCacheToken()),
423434
scopedReadStateSupplier);
424435
this.timerInternals =
425-
new WindmillTimerInternals(stateFamily, inputDataWatermark, outputDataWatermark);
436+
new WindmillTimerInternals(
437+
stateFamily, inputDataWatermark, outputDataWatermark, synchronizedProcessingTime);
426438
}
427439

428440
public void flushState() {

sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,14 @@ public abstract class TriggerContext {
264264

265265
/** The current processing time. */
266266
public abstract Instant currentProcessingTime();
267+
268+
/** The current synchronized upstream processing time or {@code null} if unknown. */
269+
@Nullable
270+
public abstract Instant currentSynchronizedProcessingTime();
271+
272+
/** The current event time for the input or {@code null} if unknown. */
273+
@Nullable
274+
public abstract Instant currentEventTime();
267275
}
268276

269277
/**

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,18 @@ public Instant currentProcessingTime() {
6969
return processingTime;
7070
}
7171

72+
/**
73+
* {@inheritDoc}
74+
*
75+
* @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing
76+
* is already complete.
77+
*/
78+
@Override
79+
@Nullable
80+
public Instant currentSynchronizedProcessingTime() {
81+
return BoundedWindow.TIMESTAMP_MAX_VALUE;
82+
}
83+
7284
@Override
7385
public Instant currentInputWatermarkTime() {
7486
return inputWatermarkTime;

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFn.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.Collection;
3030
import java.util.Map;
3131

32+
import javax.annotation.Nullable;
33+
3234
/**
3335
* Specification for processing to happen after elements have been grouped by key.
3436
*
@@ -103,6 +105,14 @@ public interface Timers {
103105

104106
/** Returns the current processing time. */
105107
public abstract Instant currentProcessingTime();
108+
109+
/** Returns the current synchronized processing time or {@code null} if unknown. */
110+
@Nullable
111+
public abstract Instant currentSynchronizedProcessingTime();
112+
113+
/** Returns the current event time or {@code null} if unknown. */
114+
@Nullable
115+
public abstract Instant currentEventTime();
106116
}
107117

108118
/** Information accessible to all the processing methods in this {@code ReduceFn}. */

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import java.util.List;
4141
import java.util.Map;
4242

43+
import javax.annotation.Nullable;
44+
4345
/**
4446
* Factory for creating instances of the various {@link ReduceFn} contexts.
4547
*/
@@ -117,6 +119,18 @@ public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
117119
public Instant currentProcessingTime() {
118120
return timerInternals.currentProcessingTime();
119121
}
122+
123+
@Override
124+
@Nullable
125+
public Instant currentSynchronizedProcessingTime() {
126+
return timerInternals.currentSynchronizedProcessingTime();
127+
}
128+
129+
@Override
130+
@Nullable
131+
public Instant currentEventTime() {
132+
return timerInternals.currentInputWatermarkTime();
133+
}
120134
}
121135

122136
static class StateContextImpl<W extends BoundedWindow>

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ public interface TimerInternals {
5454
*/
5555
Instant currentProcessingTime();
5656

57+
/**
58+
* Returns the current timestamp in the {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} time
59+
* domain or {@code null} if unknown.
60+
*/
61+
@Nullable
62+
Instant currentSynchronizedProcessingTime();
63+
5764
/**
5865
* Return the current, local input watermark timestamp for this computation
5966
* in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.Collection;
3434
import java.util.Map;
3535

36+
import javax.annotation.Nullable;
37+
3638
/**
3739
* Factory for creating instances of the various {@link Trigger} contexts.
3840
*
@@ -193,6 +195,18 @@ public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
193195
public Instant currentProcessingTime() {
194196
return timers.currentProcessingTime();
195197
}
198+
199+
@Override
200+
@Nullable
201+
public Instant currentSynchronizedProcessingTime() {
202+
return timers.currentSynchronizedProcessingTime();
203+
}
204+
205+
@Override
206+
@Nullable
207+
public Instant currentEventTime() {
208+
return timers.currentEventTime();
209+
}
196210
}
197211

198212
private class MergingTriggerInfoImpl
@@ -305,8 +319,19 @@ public void deleteTimer(Instant timestamp, TimeDomain domain) {
305319
public Instant currentProcessingTime() {
306320
return timers.currentProcessingTime();
307321
}
308-
}
309322

323+
@Override
324+
@Nullable
325+
public Instant currentSynchronizedProcessingTime() {
326+
return timers.currentSynchronizedProcessingTime();
327+
}
328+
329+
@Override
330+
@Nullable
331+
public Instant currentEventTime() {
332+
return timers.currentEventTime();
333+
}
334+
}
310335

311336
private class OnElementContextImpl extends Trigger<W>.OnElementContext {
312337

@@ -370,6 +395,18 @@ public void deleteTimer(Instant timestamp, TimeDomain domain) {
370395
public Instant currentProcessingTime() {
371396
return timers.currentProcessingTime();
372397
}
398+
399+
@Override
400+
@Nullable
401+
public Instant currentSynchronizedProcessingTime() {
402+
return timers.currentSynchronizedProcessingTime();
403+
}
404+
405+
@Override
406+
@Nullable
407+
public Instant currentEventTime() {
408+
return timers.currentEventTime();
409+
}
373410
}
374411

375412
private class OnTimerContextImpl extends Trigger<W>.OnTimerContext {
@@ -435,6 +472,18 @@ public void deleteTimer(Instant timestamp, TimeDomain domain) {
435472
public Instant currentProcessingTime() {
436473
return timers.currentProcessingTime();
437474
}
475+
476+
@Override
477+
@Nullable
478+
public Instant currentSynchronizedProcessingTime() {
479+
return timers.currentSynchronizedProcessingTime();
480+
}
481+
482+
@Override
483+
@Nullable
484+
public Instant currentEventTime() {
485+
return timers.currentEventTime();
486+
}
438487
}
439488

440489
private class OnMergeContextImpl extends Trigger<W>.OnMergeContext {
@@ -499,5 +548,17 @@ public void deleteTimer(Instant timestamp, TimeDomain domain) {
499548
public Instant currentProcessingTime() {
500549
return timers.currentProcessingTime();
501550
}
551+
552+
@Override
553+
@Nullable
554+
public Instant currentSynchronizedProcessingTime() {
555+
return timers.currentSynchronizedProcessingTime();
556+
}
557+
558+
@Override
559+
@Nullable
560+
public Instant currentEventTime() {
561+
return timers.currentEventTime();
562+
}
502563
}
503564
}

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,7 @@ public void testReadUnboundedReader() throws Exception {
646646
.build(),
647647
new Instant(0), // input watermark
648648
null, // output watermark
649+
null, // synchronized processing time
649650
null, // StateReader
650651
null, // StateFetcher
651652
Windmill.WorkItemCommitRequest.newBuilder());

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingModeExecutionContextTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public void testTimerInternalsSetTimer() {
8686
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(17L).build(),
8787
new Instant(1000), // input watermark
8888
null, // output watermark
89+
null, // synchronized processing time
8990
stateReader, stateFetcher, outputBuilder);
9091
ExecutionContext.StepContext step =
9192
executionContext.getOrCreateStepContext("step", "transform", null);
@@ -148,6 +149,7 @@ private void startContext(
148149
.build(),
149150
new Instant(0), // input watermark
150151
null, // output watermark
152+
null, // synchronized processing time
151153
null, // StateReader
152154
null, // StateFetcher
153155
Windmill.WorkItemCommitRequest.newBuilder());

0 commit comments

Comments
 (0)