Skip to content

SPARK-2034. KafkaInputDStream doesn't close resources and may prevent JVM shutdown #980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

srowen
Copy link
Member

@srowen srowen commented Jun 5, 2014

Tobias noted today on the mailing list:

I am trying to use Spark Streaming with Kafka, which works like a
charm – except for shutdown. When I run my program with "sbt
run-main", sbt will never exit, because there are two non-daemon
threads left that don't die.
I created a minimal example at
https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala.
It starts a StreamingContext and does nothing more than connecting to
a Kafka server and printing what it receives. Using the future Unknown macro: { ... } construct, I shut down the StreamingContext after some seconds and
then print the difference between the threads at start time and at end
time. The output can be found at
https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1.
There are a number of threads remaining that will prevent sbt from
exiting.
When I replace KafkaUtils.createStream(...) with a call that does
exactly the same, except that it calls consumerConnector.shutdown()
in KafkaReceiver.onStop() (which it should, IMO), the output is as
shown at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2.
Does anyone have any idea what is going on here and why the program
doesn't shut down properly? The behavior is the same with both kafka
0.8.0 and 0.8.1.1, by the way.

Something similar was noted last year:

http://mail-archives.apache.org/mod_mbox/spark-dev/201309.mbox/%3C1380220041.2428.YahooMailNeo@web160804.mail.bf1.yahoo.com%3E

KafkaInputDStream doesn't close ConsumerConnector in onStop(), and does not close the Executor it creates. The latter leaves non-daemon threads and can prevent the JVM from shutting down even if streaming is closed properly.

… is created so that its threads stop when done; close the Zookeeper client even on exception; fix a few typos; log exceptions that otherwise vanish
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15477/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15478/

@srowen
Copy link
Member Author

srowen commented Jun 18, 2014

Pardon, could I ping this issue for review and consideration for commit? I think it's a clean fix and improvement.

// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
val executorPool = Executors.newFixedThreadPool(topics.values.sum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor - but to avoid a name collision with Spark's own Executor we usually try call variables like this threadPool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that actually you didn't add this name, so nevermind!

@pwendell
Copy link
Contributor

LGTM pending tests. Jenkins retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16008/

asfgit pushed a commit that referenced this pull request Jun 22, 2014
… JVM shutdown

Tobias noted today on the mailing list:

========

I am trying to use Spark Streaming with Kafka, which works like a
charm – except for shutdown. When I run my program with "sbt
run-main", sbt will never exit, because there are two non-daemon
threads left that don't die.
I created a minimal example at
<https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala>.
It starts a StreamingContext and does nothing more than connecting to
a Kafka server and printing what it receives. Using the `future
Unknown macro: { ... }
` construct, I shut down the StreamingContext after some seconds and
then print the difference between the threads at start time and at end
time. The output can be found at
<https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1>.
There are a number of threads remaining that will prevent sbt from
exiting.
When I replace `KafkaUtils.createStream(...)` with a call that does
exactly the same, except that it calls `consumerConnector.shutdown()`
in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
shown at <https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2>.
Does anyone have any idea what is going on here and why the program
doesn't shut down properly? The behavior is the same with both kafka
0.8.0 and 0.8.1.1, by the way.

========

Something similar was noted last year:

http://mail-archives.apache.org/mod_mbox/spark-dev/201309.mbox/%3C1380220041.2428.YahooMailNeo@web160804.mail.bf1.yahoo.com%3E

KafkaInputDStream doesn't close `ConsumerConnector` in `onStop()`, and does not close the `Executor` it creates. The latter leaves non-daemon threads and can prevent the JVM from shutting down even if streaming is closed properly.

Author: Sean Owen <sowen@cloudera.com>

Closes #980 from srowen/SPARK-2034 and squashes the following commits:

9f31a8d [Sean Owen] Restore ClassTag to private class because MIMA flags it; is the shadowing intended?
2d579a8 [Sean Owen] Close ConsumerConnector in onStop; shutdown() the local Executor that is created so that its threads stop when done; close the Zookeeper client even on exception; fix a few typos; log exceptions that otherwise vanish
(cherry picked from commit 476581e)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
@asfgit asfgit closed this in 476581e Jun 22, 2014
@srowen srowen deleted the SPARK-2034 branch June 22, 2014 11:01
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
… JVM shutdown

Tobias noted today on the mailing list:

========

I am trying to use Spark Streaming with Kafka, which works like a
charm – except for shutdown. When I run my program with "sbt
run-main", sbt will never exit, because there are two non-daemon
threads left that don't die.
I created a minimal example at
<https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala>.
It starts a StreamingContext and does nothing more than connecting to
a Kafka server and printing what it receives. Using the `future
Unknown macro: { ... }
` construct, I shut down the StreamingContext after some seconds and
then print the difference between the threads at start time and at end
time. The output can be found at
<https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1>.
There are a number of threads remaining that will prevent sbt from
exiting.
When I replace `KafkaUtils.createStream(...)` with a call that does
exactly the same, except that it calls `consumerConnector.shutdown()`
in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
shown at <https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2>.
Does anyone have any idea what is going on here and why the program
doesn't shut down properly? The behavior is the same with both kafka
0.8.0 and 0.8.1.1, by the way.

========

Something similar was noted last year:

http://mail-archives.apache.org/mod_mbox/spark-dev/201309.mbox/%3C1380220041.2428.YahooMailNeo@web160804.mail.bf1.yahoo.com%3E

KafkaInputDStream doesn't close `ConsumerConnector` in `onStop()`, and does not close the `Executor` it creates. The latter leaves non-daemon threads and can prevent the JVM from shutting down even if streaming is closed properly.

Author: Sean Owen <sowen@cloudera.com>

Closes apache#980 from srowen/SPARK-2034 and squashes the following commits:

9f31a8d [Sean Owen] Restore ClassTag to private class because MIMA flags it; is the shadowing intended?
2d579a8 [Sean Owen] Close ConsumerConnector in onStop; shutdown() the local Executor that is created so that its threads stop when done; close the Zookeeper client even on exception; fix a few typos; log exceptions that otherwise vanish
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
… JVM shutdown

Tobias noted today on the mailing list:

========

I am trying to use Spark Streaming with Kafka, which works like a
charm – except for shutdown. When I run my program with "sbt
run-main", sbt will never exit, because there are two non-daemon
threads left that don't die.
I created a minimal example at
<https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala>.
It starts a StreamingContext and does nothing more than connecting to
a Kafka server and printing what it receives. Using the `future
Unknown macro: { ... }
` construct, I shut down the StreamingContext after some seconds and
then print the difference between the threads at start time and at end
time. The output can be found at
<https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1>.
There are a number of threads remaining that will prevent sbt from
exiting.
When I replace `KafkaUtils.createStream(...)` with a call that does
exactly the same, except that it calls `consumerConnector.shutdown()`
in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
shown at <https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2>.
Does anyone have any idea what is going on here and why the program
doesn't shut down properly? The behavior is the same with both kafka
0.8.0 and 0.8.1.1, by the way.

========

Something similar was noted last year:

http://mail-archives.apache.org/mod_mbox/spark-dev/201309.mbox/%3C1380220041.2428.YahooMailNeo@web160804.mail.bf1.yahoo.com%3E

KafkaInputDStream doesn't close `ConsumerConnector` in `onStop()`, and does not close the `Executor` it creates. The latter leaves non-daemon threads and can prevent the JVM from shutting down even if streaming is closed properly.

Author: Sean Owen <sowen@cloudera.com>

Closes apache#980 from srowen/SPARK-2034 and squashes the following commits:

9f31a8d [Sean Owen] Restore ClassTag to private class because MIMA flags it; is the shadowing intended?
2d579a8 [Sean Owen] Close ConsumerConnector in onStop; shutdown() the local Executor that is created so that its threads stop when done; close the Zookeeper client even on exception; fix a few typos; log exceptions that otherwise vanish
wangyum pushed a commit that referenced this pull request May 26, 2023
…ted scan dynamically (#980)

* [CARMEL-5997][FOLLOW UP] Support more sql patterns for deciding bucketed scan dynamically

* fix ut
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants