Skip to content

Commit

Permalink
Repeatedly#onFire should clear all finished bits
Browse files Browse the repository at this point in the history
Previously, Repeatedly#onFire only cleared the finished bits associated
with the root of the sub-tree, as demonstrated by the new unit tests.

This led to problems with AfterFirst#shouldFire, which checked to see if
any of the sub-triggers have their finished bits set.

Now, Repeatedly#onFire calls #resetTree, which clears all the finished
bits in the entire sub-tree.
  • Loading branch information
kennknowles committed Apr 1, 2016
1 parent e10e1fb commit e530981
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public void onFire(TriggerContext context) throws Exception {
getRepeated(context).invokeOnFire(context);

if (context.trigger().isFinished(REPEATED)) {
context.trigger().setFinished(false, REPEATED);
getRepeated(context).invokeClear(context);
// Reset tree will recursively clear the finished bits, and invoke clear.
context.forTrigger(getRepeated(context)).trigger().resetTree();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,87 @@ public void testShouldFireAfterMerge() throws Exception {
IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
assertTrue(tester.shouldFire(mergedWindow));
}

@Test
public void testRepeatedlyAfterFirstElementCount() throws Exception {
SimpleTriggerTester<GlobalWindow> tester =
TriggerTester.forTrigger(
Repeatedly.forever(
AfterFirst.<GlobalWindow>of(
AfterProcessingTime.<GlobalWindow>pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(15)),
AfterPane.<GlobalWindow>elementCountAtLeast(5))),
new GlobalWindows());

GlobalWindow window = GlobalWindow.INSTANCE;

tester.injectElements(1);
assertFalse(tester.shouldFire(window));

tester.injectElements(2, 3, 4, 5);
assertTrue(tester.shouldFire(window));
tester.fireIfShouldFire(window);
assertFalse(tester.shouldFire(window));
}

@Test
public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
SimpleTriggerTester<GlobalWindow> tester =
TriggerTester.forTrigger(
Repeatedly.forever(
AfterFirst.<GlobalWindow>of(
AfterProcessingTime.<GlobalWindow>pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(15)),
AfterPane.<GlobalWindow>elementCountAtLeast(5))),
new GlobalWindows());

GlobalWindow window = GlobalWindow.INSTANCE;

tester.injectElements(1);
assertFalse(tester.shouldFire(window));

tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
assertTrue(tester.shouldFire(window));
tester.fireIfShouldFire(window);
assertFalse(tester.shouldFire(window));
}

@Test
public void testRepeatedlyElementCount() throws Exception {
SimpleTriggerTester<GlobalWindow> tester =
TriggerTester.forTrigger(
Repeatedly.forever(AfterPane.<GlobalWindow>elementCountAtLeast(5)),
new GlobalWindows());

GlobalWindow window = GlobalWindow.INSTANCE;

tester.injectElements(1);
assertFalse(tester.shouldFire(window));

tester.injectElements(2, 3, 4, 5);
assertTrue(tester.shouldFire(window));
tester.fireIfShouldFire(window);
assertFalse(tester.shouldFire(window));
}

@Test
public void testRepeatedlyProcessingTime() throws Exception {
SimpleTriggerTester<GlobalWindow> tester =
TriggerTester.forTrigger(
Repeatedly.forever(
AfterProcessingTime.<GlobalWindow>pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(15))),
new GlobalWindows());

GlobalWindow window = GlobalWindow.INSTANCE;

tester.injectElements(1);
assertFalse(tester.shouldFire(window));

tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
assertTrue(tester.shouldFire(window));
tester.fireIfShouldFire(window);
assertFalse(tester.shouldFire(window));
}

}

0 comments on commit e530981

Please sign in to comment.