Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@
*/
public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
private final WindowFn<Object, W> windowFn;

@Nullable
private Map<W, Set<W>> activeWindowToStateAddressWindows;
private final Map<W, Set<W>> activeWindowToStateAddressWindows;

/**
* As above, but only for EPHEMERAL windows. Does not need to be persisted.
Expand All @@ -94,16 +92,14 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
* MERGED. Otherwise W1 is EPHEMERAL.
* </ul>
*/
@Nullable
private Map<W, W> windowToActiveWindow;
private final Map<W, W> windowToActiveWindow;

/**
* Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit.
*
* <p>Used to avoid writing to state if no changes have been made during the work unit.
*/
@Nullable
private Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;

/**
* Handle representing our state in the backend.
Expand Down Expand Up @@ -195,7 +191,12 @@ public void addActive(W window) {

@Override
public void remove(W window) {
for (W stateAddressWindow : activeWindowToStateAddressWindows.get(window)) {
Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
if (stateAddressWindows == null) {
// Window is no longer active.
return;
}
for (W stateAddressWindow : stateAddressWindows) {
windowToActiveWindow.remove(stateAddressWindow);
}
activeWindowToStateAddressWindows.remove(window);
Expand Down Expand Up @@ -522,7 +523,7 @@ private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> multimap)
private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
Map<W, Set<W>> newMultimap = new HashMap<>();
for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
newMultimap.put(entry.getKey(), new LinkedHashSet<W>(entry.getValue()));
newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
}
return newMultimap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ public void onTimer(TimerData timer) throws Exception {
// - The trigger may implement isClosed as constant false.
// - If the window function does not support windowing then all windows will be considered
// active.
// So we must combine the above.
// So we must take conjunction of activeWindows and triggerRunner state.
boolean windowIsActive =
activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());

Expand Down Expand Up @@ -602,7 +602,8 @@ private void clearAllState(
boolean windowIsActive)
throws Exception {
if (windowIsActive) {
// Since window is still active the trigger has not closed.
// Since both the window is in the active window set AND the trigger was not yet closed,
// it is possible we still have state.
reduceFn.clearState(renamedContext);
watermarkHold.clearHolds(renamedContext);
nonEmptyPanes.clearPane(renamedContext.state());
Expand All @@ -622,7 +623,10 @@ private void clearAllState(
}
}
paneInfoTracker.clear(directContext.state());
activeWindows.remove(directContext.window());
if (activeWindows.isActive(directContext.window())) {
// Don't need to track address state windows anymore.
activeWindows.remove(directContext.window());
}
// We'll never need to test for the trigger being closed again.
triggerRunner.clearFinished(directContext.state());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,43 @@ public void testMergeBeforeFinalizing() throws Exception {
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}

/**
* It is possible for a session window's trigger to be closed at the point at which
* the (merged) session window is garbage collected. Make sure we don't accidentally
* assume the window is still active.
*/
@Test
public void testMergingWithCloseBeforeGC() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
ClosingBehavior.FIRE_IF_NON_EMPTY);

// Two elements in two overlapping session windows.
tester.injectElements(
TimestampedValue.of(1, new Instant(1)), // in [1, 11)
TimestampedValue.of(10, new Instant(10))); // in [10, 20)

// Close the trigger, but the gargbage collection timer is still pending.
when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
triggerShouldFinish(mockTrigger);
tester.advanceInputWatermark(new Instant(30));

// Now the garbage collection timer will fire, finding the trigger already closed.
tester.advanceInputWatermark(new Instant(100));

List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertThat(output.size(), equalTo(1));
assertThat(output.get(0),
isSingleWindowedValue(containsInAnyOrder(1, 10),
1, // timestamp
1, // window start
20)); // window end
assertThat(
output.get(0).getPane(),
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}

/**
* Tests that when data is assigned to multiple windows but some of those windows have
* had their triggers finish, then the data is dropped and counted accurately.
Expand Down