From e5309810a50a69fc20b72115c5be5b1bc1570622 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 31 Mar 2016 15:03:59 -0700 Subject: [PATCH] Repeatedly#onFire should clear all finished bits Previously, Repeatedly#onFire only cleared the finished bits associated with the root of the sub-tree, as demonstrated by the new unit tests. This led to problems with AfterFirst#shouldFire, which checked to see if any of the sub-triggers have their finished bits set. Now, Repeatedly#onFire calls #resetTree, which clears all the finished bits in the entire sub-tree. --- .../sdk/transforms/windowing/Repeatedly.java | 4 +- .../transforms/windowing/RepeatedlyTest.java | 83 +++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java index e77e2a1203..5f29c8ffe3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java @@ -89,8 +89,8 @@ public void onFire(TriggerContext context) throws Exception { getRepeated(context).invokeOnFire(context); if (context.trigger().isFinished(REPEATED)) { - context.trigger().setFinished(false, REPEATED); - getRepeated(context).invokeClear(context); + // Reset tree will recursively clear the finished bits, and invoke clear. + context.forTrigger(getRepeated(context)).trigger().resetTree(); } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java index f445b52565..e14c15c4e2 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java @@ -125,4 +125,87 @@ public void testShouldFireAfterMerge() throws Exception { IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); assertTrue(tester.shouldFire(mergedWindow)); } + + @Test + public void testRepeatedlyAfterFirstElementCount() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPane.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyAfterFirstProcessingTime() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPane.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyElementCount() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(5)), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyProcessingTime() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + }