Skip to content

Commit cfda3ff

Browse files
kennknowlesdavorbonaci
authored andcommitted
Update Triggers to new shouldFire() based semantics
User-facing trigger expressions are unchanged in syntax or semantics. The high-level change to the internal-facing API is as follows: - onElement no longer returns a TriggerResult, now void - onMerge no longer returns a MergeResult, now void - onTimer no longer exists in Trigger (it exists in the runner only) - shouldFire says whether to fire; the runner controls when to ask - onFire transitions/resets a trigger for sequential firings This also addresses a particular semantic issue where OrFinally did not fire as soon as expected. ----Release Notes---- - The "finally" branch of OrFinally now fires as soon as it is satisfied; before it would often take much longer due to seeing only the prefix of each work unit that satisfied the "main" branch. [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=112809409
1 parent 61a2e27 commit cfda3ff

36 files changed

+1945
-2842
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAll.java

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -49,59 +49,25 @@ public static <W extends BoundedWindow> OnceTrigger<W> of(
4949
return new AfterAll<W>(Arrays.<Trigger<W>>asList(triggers));
5050
}
5151

52-
private TriggerResult result(TriggerContext c) {
53-
// If all children have finished, then they must have each fired at least once.
54-
if (c.trigger().areAllSubtriggersFinished()) {
55-
return TriggerResult.FIRE_AND_FINISH;
56-
}
57-
58-
return TriggerResult.CONTINUE;
59-
}
60-
6152
@Override
62-
public TriggerResult onElement(OnElementContext c) throws Exception {
53+
public void onElement(OnElementContext c) throws Exception {
6354
for (ExecutableTrigger<W> subTrigger : c.trigger().unfinishedSubTriggers()) {
6455
// Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH.
6556
// invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH.
66-
subTrigger.invokeElement(c);
57+
subTrigger.invokeOnElement(c);
6758
}
68-
69-
return result(c);
7059
}
7160

7261
@Override
73-
public MergeResult onMerge(OnMergeContext c) throws Exception {
74-
// CONTINUE if merging returns CONTINUE for at least one sub-trigger
75-
// ALREADY_FINISHED if merging returns ALREADY_FINISHED for all sub-triggers and this
76-
// trigger itself was already finished in some window.
77-
// FIRE_AND_FINISH otherwise: It means this trigger is ready to fire (because all subtriggers
78-
// are satisfied) but has never fired as a whole.
79-
boolean anyContinue = false;
80-
boolean alreadyFinished = true;
62+
public void onMerge(OnMergeContext c) throws Exception {
8163
for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
82-
MergeResult result = subTrigger.invokeMerge(c);
83-
anyContinue |= !result.isFire() && !result.isFinish();
84-
alreadyFinished &= !result.isFire() && result.isFinish();
64+
subTrigger.invokeOnMerge(c);
8565
}
86-
87-
if (anyContinue) {
88-
return MergeResult.CONTINUE;
89-
} else if (alreadyFinished && c.trigger().finishedInAnyMergingWindow()) {
90-
return MergeResult.ALREADY_FINISHED;
91-
} else {
92-
return MergeResult.FIRE_AND_FINISH;
66+
boolean allFinished = true;
67+
for (ExecutableTrigger<W> subTrigger1 : c.trigger().subTriggers()) {
68+
allFinished &= c.forTrigger(subTrigger1).trigger().isFinished();
9369
}
94-
}
95-
96-
@Override
97-
public TriggerResult onTimer(OnTimerContext c) throws Exception {
98-
for (ExecutableTrigger<W> subTrigger : c.trigger().unfinishedSubTriggers()) {
99-
// Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH.
100-
// invokeTimer will automatically mark the finish bit if they return FIRE_AND_FINISH.
101-
subTrigger.invokeTimer(c);
102-
}
103-
104-
return result(c);
70+
c.trigger().setFinished(allFinished);
10571
}
10672

10773
@Override
@@ -121,4 +87,31 @@ public Instant getWatermarkThatGuaranteesFiring(W window) {
12187
public OnceTrigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
12288
return new AfterAll<W>(continuationTriggers);
12389
}
90+
91+
/**
92+
* {@inheritDoc}
93+
*
94+
* @return {@code true} if all subtriggers return {@code true}.
95+
*/
96+
@Override
97+
public boolean shouldFire(TriggerContext context) throws Exception {
98+
for (ExecutableTrigger<W> subtrigger : context.trigger().subTriggers()) {
99+
if (!context.forTrigger(subtrigger).trigger().isFinished()
100+
&& !subtrigger.invokeShouldFire(context)) {
101+
return false;
102+
}
103+
}
104+
return true;
105+
}
106+
107+
/**
108+
* Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire}
109+
* because they all must be ready to fire.
110+
*/
111+
@Override
112+
public void onOnlyFiring(TriggerContext context) throws Exception {
113+
for (ExecutableTrigger<W> subtrigger : context.trigger().subTriggers()) {
114+
subtrigger.invokeOnFire(context);
115+
}
116+
}
124117
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright (C) 2015 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package com.google.cloud.dataflow.sdk.transforms.windowing;
18+
19+
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
20+
import com.google.cloud.dataflow.sdk.util.ReduceFn.MergingStateContext;
21+
import com.google.cloud.dataflow.sdk.util.ReduceFn.StateContext;
22+
import com.google.cloud.dataflow.sdk.util.TimeDomain;
23+
import com.google.cloud.dataflow.sdk.util.state.CombiningValueState;
24+
25+
import org.joda.time.Instant;
26+
27+
import java.util.List;
28+
29+
import javax.annotation.Nullable;
30+
31+
/**
32+
* A base class for triggers that happen after a processing time delay from the arrival
33+
* of the first element in a pane.
34+
*/
35+
abstract class AfterDelayFromFirstElement<W extends BoundedWindow> extends TimeTrigger<W> {
36+
37+
/**
38+
* To complete an implementation, return the desired time from the TriggerContext.
39+
*/
40+
@Nullable
41+
public abstract Instant getCurrentTime(Trigger<W>.TriggerContext context);
42+
43+
private final TimeDomain timeDomain;
44+
45+
public AfterDelayFromFirstElement(
46+
TimeDomain timeDomain, List<SerializableFunction<Instant, Instant>> timestampMappers) {
47+
super(timestampMappers);
48+
this.timeDomain = timeDomain;
49+
}
50+
51+
private Instant getTargetTimestamp(OnElementContext c) {
52+
return computeTargetTimestamp(c.currentProcessingTime());
53+
}
54+
55+
@Override
56+
public void prefetchOnElement(StateContext state) {
57+
state.access(DELAYED_UNTIL_TAG).get();
58+
}
59+
60+
@Override
61+
public void onElement(OnElementContext c) throws Exception {
62+
CombiningValueState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
63+
Instant oldDelayUntil = delayUntilState.get().read();
64+
65+
// Since processing time can only advance, resulting in target wake-up times we would
66+
// ignore anyhow, we don't bother with it if it is already set.
67+
if (oldDelayUntil != null) {
68+
return;
69+
}
70+
71+
Instant targetTimestamp = getTargetTimestamp(c);
72+
delayUntilState.add(targetTimestamp);
73+
c.setTimer(targetTimestamp, timeDomain);
74+
}
75+
76+
@Override
77+
public void prefetchOnMerge(MergingStateContext state) {
78+
state.mergingAccess(DELAYED_UNTIL_TAG).get();
79+
}
80+
81+
@Override
82+
public void onMerge(OnMergeContext c) throws Exception {
83+
// If the trigger is already finished, there is no way it will become re-activated
84+
if (c.trigger().isFinished()) {
85+
return;
86+
}
87+
88+
// Determine the earliest point across all the windows, and delay to that.
89+
CombiningValueState<Instant, Instant> mergingDelays =
90+
c.state().mergingAccess(DELAYED_UNTIL_TAG);
91+
92+
Instant earliestTargetTime = mergingDelays.get().read();
93+
if (earliestTargetTime != null) {
94+
mergingDelays.clear();
95+
mergingDelays.add(earliestTargetTime);
96+
c.setTimer(earliestTargetTime, timeDomain);
97+
}
98+
}
99+
100+
@Override
101+
public void prefetchShouldFire(StateContext state) {
102+
state.access(DELAYED_UNTIL_TAG).get();
103+
}
104+
105+
@Override
106+
public void clear(TriggerContext c) throws Exception {
107+
c.state().access(DELAYED_UNTIL_TAG).clear();
108+
}
109+
110+
@Override
111+
public Instant getWatermarkThatGuaranteesFiring(W window) {
112+
return BoundedWindow.TIMESTAMP_MAX_VALUE;
113+
}
114+
115+
@Override
116+
public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
117+
Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).get().read();
118+
return delayedUntil != null
119+
&& getCurrentTime(context) != null
120+
&& getCurrentTime(context).isAfter(delayedUntil);
121+
}
122+
123+
@Override
124+
protected void onOnlyFiring(Trigger<W>.TriggerContext context) throws Exception {
125+
clear(context);
126+
}
127+
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEach.java

Lines changed: 52 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616

1717
package com.google.cloud.dataflow.sdk.transforms.windowing;
1818

19+
import static com.google.common.base.Preconditions.checkArgument;
20+
1921
import com.google.cloud.dataflow.sdk.annotations.Experimental;
2022
import com.google.cloud.dataflow.sdk.util.ExecutableTrigger;
21-
import com.google.common.base.Preconditions;
2223

2324
import org.joda.time.Instant;
2425

2526
import java.util.Arrays;
26-
import java.util.Iterator;
2727
import java.util.List;
2828

2929
/**
@@ -50,7 +50,7 @@ public class AfterEach<W extends BoundedWindow> extends Trigger<W> {
5050

5151
private AfterEach(List<Trigger<W>> subTriggers) {
5252
super(subTriggers);
53-
Preconditions.checkArgument(subTriggers.size() > 1);
53+
checkArgument(subTriggers.size() > 1);
5454
}
5555

5656
/**
@@ -61,94 +61,75 @@ public static <W extends BoundedWindow> Trigger<W> inOrder(Trigger<W>... trigger
6161
return new AfterEach<W>(Arrays.<Trigger<W>>asList(triggers));
6262
}
6363

64-
private TriggerResult result(TriggerContext c, TriggerResult subResult)
65-
throws Exception {
66-
if (subResult.isFire()) {
67-
return c.trigger().areAllSubtriggersFinished()
68-
? TriggerResult.FIRE_AND_FINISH : TriggerResult.FIRE;
64+
@Override
65+
public void onElement(OnElementContext c) throws Exception {
66+
if (!c.trigger().isMerging()) {
67+
// If merges are not possible, we need only run the first unfinished subtrigger
68+
c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
6969
} else {
70-
return TriggerResult.CONTINUE;
70+
// If merges are possible, we need to run all subtriggers in parallel
71+
for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
72+
// Even if the subTrigger is done, it may be revived via merging and must have
73+
// adequate state.
74+
subTrigger.invokeOnElement(c);
75+
}
7176
}
7277
}
7378

7479
@Override
75-
public TriggerResult onElement(OnElementContext c) throws Exception {
76-
Iterator<ExecutableTrigger<W>> iterator = c.trigger().unfinishedSubTriggers().iterator();
77-
78-
// If all the sub-triggers have finished, we should have already finished, so we know there is
79-
// at least one unfinished trigger.
80-
TriggerResult firstResult = iterator.next().invokeElement(c);
81-
82-
// If onMerge might be called, we need to make sure we have proper state for future triggers.
83-
if (c.trigger().isMerging()) {
84-
if (firstResult.isFire()) {
85-
// If we're firing, clear out all of the later subtriggers, since we don't want to pollute
86-
// their state.
87-
resetRemaining(c, iterator);
80+
public void onMerge(OnMergeContext context) throws Exception {
81+
// If merging makes a subtrigger no-longer-finished, it will automatically
82+
// begin participating in shouldFire and onFire appropriately.
83+
84+
// All the following triggers are retroactively "not started" but that is
85+
// also automatic because they are cleared whenever this trigger
86+
// fires.
87+
boolean priorTriggersAllFinished = true;
88+
for (ExecutableTrigger<W> subTrigger : context.trigger().subTriggers()) {
89+
if (priorTriggersAllFinished) {
90+
subTrigger.invokeOnMerge(context);
91+
priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished();
8892
} else {
89-
// Otherwise, iterate over all of them to build up some state.
90-
while (iterator.hasNext()) {
91-
iterator.next().invokeElement(c);
92-
}
93+
subTrigger.invokeClear(context);
9394
}
9495
}
95-
96-
return result(c, firstResult);
96+
updateFinishedState(context);
9797
}
9898

9999
@Override
100-
public MergeResult onMerge(OnMergeContext c) throws Exception {
101-
// Iterate over the sub-triggers to identify the "current" sub-trigger.
102-
Iterator<ExecutableTrigger<W>> iterator = c.trigger().subTriggers().iterator();
103-
while (iterator.hasNext()) {
104-
ExecutableTrigger<W> subTrigger = iterator.next();
105-
106-
MergeResult mergeResult = subTrigger.invokeMerge(c);
107-
108-
if (MergeResult.CONTINUE.equals(mergeResult)) {
109-
resetRemaining(c, iterator);
110-
return MergeResult.CONTINUE;
111-
} else if (MergeResult.FIRE.equals(mergeResult)) {
112-
resetRemaining(c, iterator);
113-
return MergeResult.FIRE;
114-
} else if (MergeResult.FIRE_AND_FINISH.equals(mergeResult)) {
115-
resetRemaining(c, iterator);
116-
return c.trigger().areAllSubtriggersFinished()
117-
? MergeResult.FIRE_AND_FINISH : MergeResult.FIRE;
118-
}
119-
}
120-
121-
// If we get here, all the merges indicated they were finished, which means there was at least
122-
// one merged window in which the triggers had all already finished. Given that, this AfterEach
123-
// would have already finished in that window as well. Since the window was still in the window
124-
// set for merging, we can return FINISHED (because we were finished in that window) and we also
125-
// know that there must be another trigger (parent or sibling) which hasn't finished yet, which
126-
// will FIRE, CONTINUE, or FIRE_AND_FINISH.
127-
return MergeResult.ALREADY_FINISHED;
100+
public Instant getWatermarkThatGuaranteesFiring(W window) {
101+
// This trigger will fire at least once when the first trigger in the sequence
102+
// fires at least once.
103+
return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window);
128104
}
129105

130-
private void resetRemaining(
131-
TriggerContext c, Iterator<ExecutableTrigger<W>> triggers) throws Exception {
132-
while (triggers.hasNext()) {
133-
c.forTrigger(triggers.next()).trigger().resetTree();
134-
}
106+
@Override
107+
public Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
108+
return Repeatedly.forever(new AfterFirst<W>(continuationTriggers));
135109
}
136110

137111
@Override
138-
public TriggerResult onTimer(OnTimerContext c) throws Exception {
139-
// Only deliver to the currently active subtrigger
140-
return result(c, c.trigger().firstUnfinishedSubTrigger().invokeTimer(c));
112+
public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
113+
ExecutableTrigger<W> firstUnfinished = context.trigger().firstUnfinishedSubTrigger();
114+
return firstUnfinished.invokeShouldFire(context);
141115
}
142116

143117
@Override
144-
public Instant getWatermarkThatGuaranteesFiring(W window) {
145-
// This trigger will fire at least once when the first trigger in the sequence
146-
// fires at least once.
147-
return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window);
118+
public void onFire(Trigger<W>.TriggerContext context) throws Exception {
119+
context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);
120+
121+
// Reset all subtriggers if in a merging context; any may be revived by merging so they are
122+
// all run in parallel for each pending pane.
123+
if (context.trigger().isMerging()) {
124+
for (ExecutableTrigger<W> subTrigger : context.trigger().subTriggers()) {
125+
subTrigger.invokeClear(context);
126+
}
127+
}
128+
129+
updateFinishedState(context);
148130
}
149131

150-
@Override
151-
public Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
152-
return Repeatedly.forever(new AfterFirst<W>(continuationTriggers));
132+
private void updateFinishedState(TriggerContext context) {
133+
context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null);
153134
}
154135
}

0 commit comments

Comments
 (0)