Skip to content

[SPARK-10224][Streaming]Fix the issue that blockIntervalTimer won't call updateCurrentBuffer when stopping #8417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Aug 25, 2015

blockIntervalTimer.stop(interruptTimer = false) doesn't guarantee calling updateCurrentBuffer. So it's possible that blockIntervalTimer will exit when updateCurrentBuffer is not empty. Then the data in currentBuffer will be lost.

To reproduce it, you can add Thread.sleep(200) in this line (

logDebug("Callback for " + name + " called at time " + prevTime)
) and run StreamingContexSuite.
I cannot write a unit test to reproduce it because I cannot find an approach to force RecurringTimer suspend at this line for a few milliseconds.

There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console

This PR updates RecurringTimer to make sure stop(interruptTimer = false) will call callback at least once after the stop method is called.

@@ -184,9 +184,10 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
// Verify that the final data is present in the final generated block and
// pushed before complete stop
assert(blockGenerator.isStopped() === false) // generator has not stopped yet
clock.advance(blockIntervalMs) // force block generation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition here. If updateCurrentBuffer finishes before blockIntervalTimer.stop(interruptTimer = false), this test will fail because RecurringTimer is blocking in this line:

Therefore I called clock.advance(blockIntervalMs) in every trial to make sure we wont' block in clock.waitTillTime.

@SparkQA
Copy link

SparkQA commented Aug 25, 2015

Test build #41534 has finished for PR 8417 at commit 94f108b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

@tdas

@zsxwing
Copy link
Member Author

zsxwing commented Sep 21, 2015

@tdas let me try to use the following table to explain the issue. Assume currentBuffer is empty at the beginning, and there are 3 threads running in the following execution order:

execution order thread 1 (RecurringTimer.loop()) thread 2 (BlockGenerator.stop()) thread 3
1 prevTime = nextTime
2 nextTime += period
3 logDebug("Callback for " + name + " called at time " + prevTime)
4 BlockGenerator.addData("1")
5 BlockGenerator.addData("2")
6 state = StoppedAddingData
7 blockIntervalTimer.stop(interruptTimer = false)
8 while (!stopped) {
9 exit RecurringTimer.loop

After step 9, currentBuffer is [1, 2] but won't be consumed since RecurringTimer has already exited.

var isCurrentBufferEmpty = synchronized { currentBuffer.isEmpty }
while(!isCurrentBufferEmpty) {
Thread.sleep(blockIntervalMs)
isCurrentBufferEmpty = synchronized { currentBuffer.isEmpty }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better fix would be to ensure that updateCurrentBuffer is called when blockIntervalTimer is called. This can be done by simply adding a parameter to RecurringTimer to ensure that the callback is made in the stop method.

The advantage is that we won't be burning CPU here.

@tdas
Copy link
Contributor

tdas commented Sep 21, 2015

@zsxwing I got the race condition already, but thank for adding that table, makes it very very obvious. Regarding solution, I think I agree with @harishreedharan at a high level. It is better to ensure that the existing assumption on RecurringTimer is better met. There are number of uses of RecurringTimer and its best if that gives more guarantees, rather than higher layers work around its limitations. So could you make sure the it is guaranteed that RecurringTimer will call the function one more time after stop is called?

@zsxwing
Copy link
Member Author

zsxwing commented Sep 22, 2015

How about adding an onStop parameter like this:

class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, onStop: (Long) => Unit, name: String)

So that we could do different work when stopping RecurringTimer. onStop is just doing nothing by default.

@tdas
Copy link
Contributor

tdas commented Sep 22, 2015

Why add another function? Cant we just ensure that the callback gets called once more after timer.stop(immediately = false)?

@zsxwing
Copy link
Member Author

zsxwing commented Sep 22, 2015

Why add another function? Cant we just ensure that the callback gets called once more after timer.stop(immediately = false)?

The semantics of RecurringTimer is calling callback every period milliseconds. It looks weird that we call callback after stopping.

@tdas
Copy link
Contributor

tdas commented Sep 22, 2015

Oh yes, it will wait for the right amount of time before calling the "callback" one last time.

@SparkQA
Copy link

SparkQA commented Sep 22, 2015

Test build #42807 has finished for PR 8417 at commit a1c9354.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -99,6 +99,8 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
nextTime += period
logDebug("Callback for " + name + " called at time " + prevTime)
}
clock.waitTillTime(nextTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to wait till the next time? You could just take a parameter specifying whether the method should be called on stop or not, and then pass the current time in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to wait till the next time? You could just take a parameter specifying whether the method should be called on stop or not, and then pass the current time in.

I just want to make it not break the semantics of RecurringTimer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the RecurringTimer is designed to call the function at particular time interval, it should not deviate from that even for the last call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, are you updating the prevTime which is returned by stop() as the last time the call was made?
I think there arent unit tests for this, could you add them, so that this behavior is properly captured.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 22, 2015

@tdas updated this PR. Could you take another look?

@SparkQA
Copy link

SparkQA commented Sep 22, 2015

Test build #42837 has finished for PR 8417 at commit 193d191.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 22, 2015

Test build #42838 has finished for PR 8417 at commit 429728f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
@volatile var lastTime = -1L
// Now RecurringTimer is waiting for the next interval
val t = new Thread {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: t -> thread, got confused with t being timer.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 23, 2015

I removed isStopped since it's only for testing. I used PrivateMethod in unit tests to access stopped rather than adding isStopped in this PR.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #42874 has finished for PR 8417 at commit e8e490d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Sep 23, 2015

LGTM. I am running the tests a few times to make sure that this its not flaky.

@tdas
Copy link
Contributor

tdas commented Sep 23, 2015

Could you update the solution in the PR text to specify the actual solution.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #1787 has finished for PR 8417 at commit e8e490d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #1786 has finished for PR 8417 at commit e8e490d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #1788 has finished for PR 8417 at commit e8e490d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #1789 has finished for PR 8417 at commit e8e490d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 23, 2015

Test build #1787 has finished for PR 8417 at commit e8e490d.

This patch fails Spark unit tests.
This patch merges cleanly.
This patch adds no public classes.

This is not related to this patch. I fixed it in #8877.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #1792 has finished for PR 8417 at commit e8e490d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #1790 has finished for PR 8417 at commit e8e490d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Sep 23, 2015

I am merging this to master and 1.5.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #1791 has finished for PR 8417 at commit e8e490d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Sep 23, 2015
… call updateCurrentBuffer when stopping

`blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling `updateCurrentBuffer`. So it's possible that `blockIntervalTimer` will exit when `updateCurrentBuffer` is not empty. Then the data in `currentBuffer` will be lost.

To reproduce it, you can add `Thread.sleep(200)` in this line (https://github.com/apache/spark/blob/69c9c177160e32a2fbc9b36ecc52156077fca6fc/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala#L100) and run `StreamingContexSuite`.
I cannot write a unit test to reproduce it because I cannot find an approach to force `RecurringTimer` suspend at this line for a few milliseconds.

There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console

This PR updates RecurringTimer to make sure `stop(interruptTimer = false)` will call `callback` at least once after the `stop` method is called.

Author: zsxwing <zsxwing@gmail.com>

Closes #8417 from zsxwing/SPARK-10224.

(cherry picked from commit 44c28ab)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 44c28ab Sep 23, 2015
@zsxwing zsxwing deleted the SPARK-10224 branch September 23, 2015 12:43
ashangit pushed a commit to ashangit/spark that referenced this pull request Oct 19, 2016
… call updateCurrentBuffer when stopping

`blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling `updateCurrentBuffer`. So it's possible that `blockIntervalTimer` will exit when `updateCurrentBuffer` is not empty. Then the data in `currentBuffer` will be lost.

To reproduce it, you can add `Thread.sleep(200)` in this line (https://github.com/apache/spark/blob/69c9c177160e32a2fbc9b36ecc52156077fca6fc/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala#L100) and run `StreamingContexSuite`.
I cannot write a unit test to reproduce it because I cannot find an approach to force `RecurringTimer` suspend at this line for a few milliseconds.

There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console

This PR updates RecurringTimer to make sure `stop(interruptTimer = false)` will call `callback` at least once after the `stop` method is called.

Author: zsxwing <zsxwing@gmail.com>

Closes apache#8417 from zsxwing/SPARK-10224.

(cherry picked from commit 44c28ab)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
(cherry picked from commit 6a616d0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants