Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Backport apache/incubator-beam#65 #167

Merged
merged 4 commits into from
Apr 4, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Basic non-null checks
  • Loading branch information
Mark Shields authored and dhalperi committed Mar 25, 2016
commit ff6617f6d22c487991d7edbbf4a4b607b2b2f6ca
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,11 @@ private Collection<W> processElement(WindowedValue<InputT> value) throws Excepti
directContext.timestamp(),
directContext.timers(),
directContext.state());

// At this point, if triggerRunner.shouldFire before the processValue then
// triggerRunner.shouldFire after the processValue. In other words adding values
// cannot take a trigger state from firing to non-firing.
// (We don't actually assert this since it is too slow.)
}

return windows;
Expand Down Expand Up @@ -568,6 +573,11 @@ public void onTimer(TimerData timer) throws Exception {
boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
&& timer.getTimestamp().equals(window.maxTimestamp());
if (isEndOfWindow) {
// If the window strategy trigger includes a watermark trigger then at this point
// there should be no data holds, either because we'd already cleared them on an
// earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
// We could assert this but it is very expensive.

// Since we are processing an on-time firing we should schedule the garbage collection
// timer. (If getAllowedLateness is zero then the timer event will be considered a
// cleanup event and handled by the above).
Expand Down Expand Up @@ -715,8 +725,11 @@ private void onTrigger(
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
boolean isFinished)
throws Exception {
Instant inputWM = timerInternals.currentInputWatermarkTime();
Preconditions.checkNotNull(inputWM);

// Prefetch necessary states
ReadableState<Instant> outputTimestampFuture =
ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
ReadableState<PaneInfo> paneFuture =
paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
Expand All @@ -729,7 +742,41 @@ private void onTrigger(
// Calculate the pane info.
final PaneInfo pane = paneFuture.read();
// Extract the window hold, and as a side effect clear it.
final Instant outputTimestamp = outputTimestampFuture.read();

WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
final Instant outputTimestamp = pair.oldHold;
@Nullable Instant newHold = pair.newHold;

if (newHold != null && inputWM != null) {
// We can't be finished yet.
Preconditions.checkState(
!isFinished, "new hold at %s but finished %s", newHold, directContext.window());
// The hold cannot be behind the input watermark.
Preconditions.checkState(
!newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
if (newHold.isAfter(directContext.window().maxTimestamp())) {
// The hold must be for garbage collection, which can't have happened yet.
Preconditions.checkState(
newHold.isEqual(
directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())),
"new hold %s should be at garbage collection for window %s plus %s",
newHold,
directContext.window(),
windowingStrategy.getAllowedLateness());
} else {
// The hold must be for the end-of-window, which can't have happened yet.
Preconditions.checkState(
newHold.isEqual(directContext.window().maxTimestamp()),
"new hold %s should be at end of window %s",
newHold,
directContext.window());
Preconditions.checkState(
!isEndOfWindow,
"new hold at %s for %s but this is the watermark trigger",
newHold,
directContext.window());
}
}

// Only emit a pane if it has data or empty panes are observable.
if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throw
}

public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
// shouldFire should be false.
// However it is too expensive to assert.
FinishedTriggersBitSet finishedSet =
readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
Trigger<W>.TriggerContext context = contextFactory.base(window, timers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext context)

Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
Preconditions.checkNotNull(inputWM);

// Only add the hold if we can be sure:
// - the backend will be able to respect it
Expand Down Expand Up @@ -287,6 +288,8 @@ private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context) {
// by the end of window (ie the end of window is at or ahead of the input watermark).
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
Preconditions.checkNotNull(inputWM);

String which;
boolean tooLate;
Instant eowHold = context.window().maxTimestamp();
Expand Down Expand Up @@ -329,6 +332,8 @@ private Instant addGarbageCollectionHold(ReduceFn<?, ?, ?, W>.Context context) {
Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness());
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
Preconditions.checkNotNull(inputWM);

WindowTracing.trace(
"WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for "
+ "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
Expand Down Expand Up @@ -368,6 +373,19 @@ public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
addEndOfWindowOrGarbageCollectionHolds(context);
}

/**
* Result of {@link #extractAndRelease}.
*/
public static class OldAndNewHolds {
public final Instant oldHold;
@Nullable public final Instant newHold;

public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
this.oldHold = oldHold;
this.newHold = newHold;
}
}

/**
* Return (a future for) the earliest hold for {@code context}. Clear all the holds after
* reading, but add/restore an end-of-window or garbage collection hold if required.
Expand All @@ -377,46 +395,46 @@ public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
* elements in the current pane. If there is no such value the timestamp is the end
* of the window.
*/
public ReadableState<Instant> extractAndRelease(
public ReadableState<OldAndNewHolds> extractAndRelease(
final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
WindowTracing.debug(
"extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
return new ReadableState<Instant>() {
return new ReadableState<OldAndNewHolds>() {
@Override
public ReadableState<Instant> readLater() {
public ReadableState<OldAndNewHolds> readLater() {
elementHoldState.readLater();
extraHoldState.readLater();
return this;
}

@Override
public Instant read() {
public OldAndNewHolds read() {
// Read both the element and extra holds.
Instant elementHold = elementHoldState.read();
Instant extraHold = extraHoldState.read();
Instant hold;
Instant oldHold;
// Find the minimum, accounting for null.
if (elementHold == null) {
hold = extraHold;
oldHold = extraHold;
} else if (extraHold == null) {
hold = elementHold;
oldHold = elementHold;
} else if (elementHold.isBefore(extraHold)) {
hold = elementHold;
oldHold = elementHold;
} else {
hold = extraHold;
oldHold = extraHold;
}
if (hold == null || hold.isAfter(context.window().maxTimestamp())) {
if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
// If no hold (eg because all elements came in behind the output watermark), or
// the hold was for garbage collection, take the end of window as the result.
WindowTracing.debug(
"WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
+ "for key:{}; window:{}",
hold, context.key(), context.window());
hold = context.window().maxTimestamp();
oldHold, context.key(), context.window());
oldHold = context.window().maxTimestamp();
}
WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}",
context.key(), context.window());
Expand All @@ -425,13 +443,14 @@ public Instant read() {
elementHoldState.clear();
extraHoldState.clear();

@Nullable Instant newHold = null;
if (!isFinished) {
// Only need to leave behind an end-of-window or garbage collection hold
// if future elements will be processed.
addEndOfWindowOrGarbageCollectionHolds(context);
newHold = addEndOfWindowOrGarbageCollectionHolds(context);
}

return hold;
return new OldAndNewHolds(oldHold, newHold);
}
};
}
Expand All @@ -447,4 +466,12 @@ public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
context.state().access(elementHoldTag).clear();
context.state().access(EXTRA_HOLD_TAG).clear();
}

/**
* Return the current data hold, or null if none. Does not clear. For debugging only.
*/
@Nullable
public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
return context.state().access(elementHoldTag).read();
}
}