-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-11761] Prevent the call to StreamingContext#stop() in the listener bus's thread #9741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #46001 has finished for PR 9741 at commit
|
Test build #46003 has finished for PR 9741 at commit
|
@tedyu please file a JIRA and put it in the title. I've reminded you many times in the past already. |
@andrewor14 Will pay attention next time. |
Test build #46009 has finished for PR 9741 at commit
|
@tedyu could you add a unit test for this fix? Otherwise LGTM |
yeah. I can merge this to 1.6 if you add a unit test. |
Please take a look at the test and see what should be improved. |
class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener { | ||
override def onOutputOperationStarted( | ||
outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = { | ||
ssc.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the listener bus will just log the exception. You can catch the exception here and use a field to store it. Then you can assert the exception in the test.
* A StreamingListener that calls StreamingContext.stop(). | ||
*/ | ||
class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener { | ||
var sparkExSeen = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add Oh, it's not necessary since you check it after stopping. There must be some memory barrier already.@volatile
?
Test build #46134 has finished for PR 9741 at commit
|
LGTM |
Test build #46125 has finished for PR 9741 at commit
|
This an old commit. I will wait for the test on the latest commit to pass. Otherwise LGTM. |
Test build #46132 has finished for PR 9741 at commit
|
Test build #46136 has finished for PR 9741 at commit
|
For both 46132 and 46136:
|
retest this please |
DirectKafkaStreamSuite passed locally:
|
|
Jenkins, retest this please |
DirectKafkaStreamSuite failed in maven Jenkins: |
Test build #46158 has finished for PR 9741 at commit
|
I am merging this to master and 1.6. Thanks @tedyu! |
…ener bus's thread See discussion toward the tail of #9723 From zsxwing : ``` The user should not call stop or other long-time work in a listener since it will block the listener thread, and prevent from stopping SparkContext/StreamingContext. I cannot see an approach since we need to stop the listener bus's thread before stopping SparkContext/StreamingContext totally. ``` Proposed solution is to prevent the call to StreamingContext#stop() in the listener bus's thread. Author: tedyu <yuzhihong@gmail.com> Closes #9741 from tedyu/master. (cherry picked from commit 446738e) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Should we do this in |
I agree. |
… bus's thread This is continuation of SPARK-11761 Andrew suggested adding this protection. See tail of #9741 Author: tedyu <yuzhihong@gmail.com> Closes #9852 from tedyu/master. (cherry picked from commit 8101254) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
… bus's thread This is continuation of SPARK-11761 Andrew suggested adding this protection. See tail of #9741 Author: tedyu <yuzhihong@gmail.com> Closes #9852 from tedyu/master.
… bus's thread This is continuation of SPARK-11761 Andrew suggested adding this protection. See tail of apache/spark#9741 Author: tedyu <yuzhihong@gmail.com> Closes #9852 from tedyu/master.
See discussion toward the tail of #9723
From @zsxwing :
Proposed solution is to prevent the call to StreamingContext#stop() in the listener bus's thread.