Skip to content

[SPARK-11761] Prevent the call to StreamingContext#stop() in the listener bus's thread #9741

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from

Conversation

tedyu
Copy link
Contributor

@tedyu tedyu commented Nov 16, 2015

See discussion toward the tail of #9723
From @zsxwing :

The user should not call stop or other long-time work in a listener since it will block the listener thread, and prevent from stopping SparkContext/StreamingContext.

I cannot see an approach since we need to stop the listener bus's thread before stopping SparkContext/StreamingContext totally.

Proposed solution is to prevent the call to StreamingContext#stop() in the listener bus's thread.

@tedyu
Copy link
Contributor Author

tedyu commented Nov 16, 2015

@tdas @zsxwing
Please take a look

@SparkQA
Copy link

SparkQA commented Nov 16, 2015

Test build #46001 has finished for PR 9741 at commit bc40285.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 16, 2015

Test build #46003 has finished for PR 9741 at commit 8f583b9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

@tedyu please file a JIRA and put it in the title. I've reminded you many times in the past already.

@tedyu tedyu changed the title Prevent the call to StreamingContext#stop() in the listener bus's thread [SPARK-11761] Prevent the call to StreamingContext#stop() in the listener bus's thread Nov 16, 2015
@tedyu
Copy link
Contributor Author

tedyu commented Nov 16, 2015

@andrewor14
Was pulled into a meeting when filing the JIRA.

Will pay attention next time.

@SparkQA
Copy link

SparkQA commented Nov 16, 2015

Test build #46009 has finished for PR 9741 at commit abab461.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tedyu
Copy link
Contributor Author

tedyu commented Nov 17, 2015

@tdas @zsxwing
Any feedback ?

@zsxwing
Copy link
Member

zsxwing commented Nov 17, 2015

@tedyu could you add a unit test for this fix? Otherwise LGTM

@tdas
Copy link
Contributor

tdas commented Nov 17, 2015

yeah. I can merge this to 1.6 if you add a unit test.

@tedyu
Copy link
Contributor Author

tedyu commented Nov 18, 2015

Please take a look at the test and see what should be improved.

class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener {
override def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
ssc.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the listener bus will just log the exception. You can catch the exception here and use a field to store it. Then you can assert the exception in the test.

* A StreamingListener that calls StreamingContext.stop().
*/
class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener {
var sparkExSeen = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add @volatile? Oh, it's not necessary since you check it after stopping. There must be some memory barrier already.

@SparkQA
Copy link

SparkQA commented Nov 18, 2015

Test build #46134 has finished for PR 9741 at commit 8ff4c61.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Nov 18, 2015

LGTM

@SparkQA
Copy link

SparkQA commented Nov 18, 2015

Test build #46125 has finished for PR 9741 at commit d67a133.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Nov 18, 2015

This an old commit. I will wait for the test on the latest commit to pass. Otherwise LGTM.

@SparkQA
Copy link

SparkQA commented Nov 18, 2015

Test build #46132 has finished for PR 9741 at commit e0c6163.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 18, 2015

Test build #46136 has finished for PR 9741 at commit 922bf0b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tedyu
Copy link
Contributor Author

tedyu commented Nov 18, 2015

For both 46132 and 46136:

[info] StreamingListenerSuite:
[info] - batch info reporting (752 milliseconds)
[info] - receiver info reporting (369 milliseconds)
[info] - output operation reporting (318 milliseconds)
[info] - don't call ssc.stop in listener (962 milliseconds)
[info] - onBatchCompleted with successful batch (1 second, 12 milliseconds)

@zsxwing
Copy link
Member

zsxwing commented Nov 18, 2015

retest this please

@tedyu
Copy link
Contributor Author

tedyu commented Nov 18, 2015

DirectKafkaStreamSuite passed locally:

Run starting. Expected test count is: 6
DirectKafkaStreamSuite:
 - basic stream receiving with multiple topics and smallest starting offset
- receiving from largest starting offset
- creating stream by offset
- offset recovery
- Direct Kafka stream report input information
- using rate controller
Run completed in 37 seconds, 464 milliseconds.

@tedyu
Copy link
Contributor Author

tedyu commented Nov 18, 2015

ERROR: Timeout after 15 minutes
ERROR: Error fetching remote repo 'origin'
hudson.plugins.git.GitException: Failed to fetch from https://github.com/apache/spark.git
    at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:763)

@tedyu
Copy link
Contributor Author

tedyu commented Nov 18, 2015

Jenkins, retest this please

@tedyu
Copy link
Contributor Author

tedyu commented Nov 18, 2015

@SparkQA
Copy link

SparkQA commented Nov 18, 2015

Test build #46158 has finished for PR 9741 at commit 922bf0b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Nov 18, 2015

I am merging this to master and 1.6. Thanks @tedyu!

asfgit pushed a commit that referenced this pull request Nov 18, 2015
…ener bus's thread

See discussion toward the tail of #9723
From zsxwing :
```
The user should not call stop or other long-time work in a listener since it will block the listener thread, and prevent from stopping SparkContext/StreamingContext.

I cannot see an approach since we need to stop the listener bus's thread before stopping SparkContext/StreamingContext totally.
```
Proposed solution is to prevent the call to StreamingContext#stop() in the listener bus's thread.

Author: tedyu <yuzhihong@gmail.com>

Closes #9741 from tedyu/master.

(cherry picked from commit 446738e)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 446738e Nov 18, 2015
@andrewor14
Copy link
Contributor

Should we do this in SparkContext as well? The user can still do sc.stop() on job end or something.

@tedyu
Copy link
Contributor Author

tedyu commented Nov 20, 2015

I agree.
New PR coming

asfgit pushed a commit that referenced this pull request Nov 24, 2015
… bus's thread

This is continuation of SPARK-11761

Andrew suggested adding this protection. See tail of #9741

Author: tedyu <yuzhihong@gmail.com>

Closes #9852 from tedyu/master.

(cherry picked from commit 8101254)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
asfgit pushed a commit that referenced this pull request Nov 24, 2015
… bus's thread

This is continuation of SPARK-11761

Andrew suggested adding this protection. See tail of #9741

Author: tedyu <yuzhihong@gmail.com>

Closes #9852 from tedyu/master.
kiszk pushed a commit to kiszk/spark-gpu that referenced this pull request Dec 26, 2015
… bus's thread

This is continuation of SPARK-11761

Andrew suggested adding this protection. See tail of apache/spark#9741

Author: tedyu <yuzhihong@gmail.com>

Closes #9852 from tedyu/master.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants