-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-10224][Streaming]Fix the issue that blockIntervalTimer won't call updateCurrentBuffer when stopping #8417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -184,9 +184,10 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { | |||
// Verify that the final data is present in the final generated block and | |||
// pushed before complete stop | |||
assert(blockGenerator.isStopped() === false) // generator has not stopped yet | |||
clock.advance(blockIntervalMs) // force block generation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition here. If updateCurrentBuffer
finishes before blockIntervalTimer.stop(interruptTimer = false)
, this test will fail because RecurringTimer is blocking in this line:
spark/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
Line 96 in 69c9c17
clock.waitTillTime(nextTime) |
Therefore I called clock.advance(blockIntervalMs)
in every trial to make sure we wont' block in clock.waitTillTime
.
Test build #41534 has finished for PR 8417 at commit
|
@tdas let me try to use the following table to explain the issue. Assume
After step 9, |
var isCurrentBufferEmpty = synchronized { currentBuffer.isEmpty } | ||
while(!isCurrentBufferEmpty) { | ||
Thread.sleep(blockIntervalMs) | ||
isCurrentBufferEmpty = synchronized { currentBuffer.isEmpty } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A better fix would be to ensure that updateCurrentBuffer
is called when blockIntervalTimer
is called. This can be done by simply adding a parameter to RecurringTimer
to ensure that the callback is made in the stop
method.
The advantage is that we won't be burning CPU here.
@zsxwing I got the race condition already, but thank for adding that table, makes it very very obvious. Regarding solution, I think I agree with @harishreedharan at a high level. It is better to ensure that the existing assumption on RecurringTimer is better met. There are number of uses of RecurringTimer and its best if that gives more guarantees, rather than higher layers work around its limitations. So could you make sure the it is guaranteed that RecurringTimer will call the function one more time after stop is called? |
How about adding an
So that we could do different work when stopping |
Why add another function? Cant we just ensure that the |
The semantics of |
Oh yes, it will wait for the right amount of time before calling the "callback" one last time. |
Test build #42807 has finished for PR 8417 at commit
|
@@ -99,6 +99,8 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: | |||
nextTime += period | |||
logDebug("Callback for " + name + " called at time " + prevTime) | |||
} | |||
clock.waitTillTime(nextTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to wait till the next time? You could just take a parameter specifying whether the method should be called on stop or not, and then pass the current time in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to wait till the next time? You could just take a parameter specifying whether the method should be called on stop or not, and then pass the current time in.
I just want to make it not break the semantics of RecurringTimer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the RecurringTimer is designed to call the function at particular time interval, it should not deviate from that even for the last call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, are you updating the prevTime
which is returned by stop()
as the last time the call was made?
I think there arent unit tests for this, could you add them, so that this behavior is properly captured.
@tdas updated this PR. Could you take another look? |
Test build #42837 has finished for PR 8417 at commit
|
Test build #42838 has finished for PR 8417 at commit
|
} | ||
@volatile var lastTime = -1L | ||
// Now RecurringTimer is waiting for the next interval | ||
val t = new Thread { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: t
-> thread
, got confused with t
being timer.
I removed |
Test build #42874 has finished for PR 8417 at commit
|
LGTM. I am running the tests a few times to make sure that this its not flaky. |
Could you update the solution in the PR text to specify the actual solution. |
Test build #1787 has finished for PR 8417 at commit
|
Test build #1786 has finished for PR 8417 at commit
|
Test build #1788 has finished for PR 8417 at commit
|
Test build #1789 has finished for PR 8417 at commit
|
Test build #1792 has finished for PR 8417 at commit
|
Test build #1790 has finished for PR 8417 at commit
|
I am merging this to master and 1.5. |
Test build #1791 has finished for PR 8417 at commit
|
… call updateCurrentBuffer when stopping `blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling `updateCurrentBuffer`. So it's possible that `blockIntervalTimer` will exit when `updateCurrentBuffer` is not empty. Then the data in `currentBuffer` will be lost. To reproduce it, you can add `Thread.sleep(200)` in this line (https://github.com/apache/spark/blob/69c9c177160e32a2fbc9b36ecc52156077fca6fc/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala#L100) and run `StreamingContexSuite`. I cannot write a unit test to reproduce it because I cannot find an approach to force `RecurringTimer` suspend at this line for a few milliseconds. There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console This PR updates RecurringTimer to make sure `stop(interruptTimer = false)` will call `callback` at least once after the `stop` method is called. Author: zsxwing <zsxwing@gmail.com> Closes #8417 from zsxwing/SPARK-10224. (cherry picked from commit 44c28ab) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
… call updateCurrentBuffer when stopping `blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling `updateCurrentBuffer`. So it's possible that `blockIntervalTimer` will exit when `updateCurrentBuffer` is not empty. Then the data in `currentBuffer` will be lost. To reproduce it, you can add `Thread.sleep(200)` in this line (https://github.com/apache/spark/blob/69c9c177160e32a2fbc9b36ecc52156077fca6fc/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala#L100) and run `StreamingContexSuite`. I cannot write a unit test to reproduce it because I cannot find an approach to force `RecurringTimer` suspend at this line for a few milliseconds. There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console This PR updates RecurringTimer to make sure `stop(interruptTimer = false)` will call `callback` at least once after the `stop` method is called. Author: zsxwing <zsxwing@gmail.com> Closes apache#8417 from zsxwing/SPARK-10224. (cherry picked from commit 44c28ab) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com> (cherry picked from commit 6a616d0)
blockIntervalTimer.stop(interruptTimer = false)
doesn't guarantee callingupdateCurrentBuffer
. So it's possible thatblockIntervalTimer
will exit whenupdateCurrentBuffer
is not empty. Then the data incurrentBuffer
will be lost.To reproduce it, you can add
Thread.sleep(200)
in this line (spark/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
Line 100 in 69c9c17
StreamingContexSuite
.I cannot write a unit test to reproduce it because I cannot find an approach to force
RecurringTimer
suspend at this line for a few milliseconds.There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console
This PR updates RecurringTimer to make sure
stop(interruptTimer = false)
will callcallback
at least once after thestop
method is called.