Skip to content

[SPARK-23894][CORE][SQL] Defensively clear ActiveSession in Executors #21185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Apr 27, 2018

Because SparkSession.getActiveSession uses an InheritableThreadLocal,
the ThreadPool in executors might end up inheriting the SparkSession
from a driver thread. This leads to test failures as executors should
never have an active SparkSession. So in local mode, defensively clear
the active session.

Because SparkSession.getActiveSession uses an InheritableThreadLocal,
the ThreadPool in executors might end up inheriting the SparkSession
from a driver thread.  This leads to test failures as executors should
never have an active SparkSession.  So in local mode, defensively clear
the active session.
@SparkQA
Copy link

SparkQA commented Apr 28, 2018

Test build #89939 has finished for PR 21185 at commit 2a4944f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Only in local mode, we have to prevent the driver from setting the active SparkSession
* in the executor threads. See SPARK-23894.
*/
lazy val clearActiveSparkSessionMethod = if (Utils.isLocalMaster(conf)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

@@ -299,6 +316,9 @@ private[spark] class Executor(
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
// When running in local mode, we might end up with the active session from the driver set on
// this thread, though we never should, so we defensively clear it. See SPARK-23894.
clearActiveSparkSessionMethod.foreach(_.invoke(null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done in the thread pool's thread factory instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadFactories only create threads, they don't start them, so you can't do it directly there, as I need this to run inside the thread.

I thought about trying to do something fancier, like having the thread pool also start the thread and submit one runnable to do this, or having threads get promoted to the correct thread pool after this ran, but I felt like it was too error-prone for minimal gain.

@cloud-fan
Copy link
Contributor

I think SparkSession is driver only, how do we access it in executor?

@squito
Copy link
Contributor Author

squito commented Apr 30, 2018

@cloud-fan

I think SparkSession is driver only, how do we access it in executor?

that's the whole problem. Its only meant to be available on the driver, but it ends up getting set on the executor when running in local mode. Because it uses an inheritable thread local, when the executor thread pool creates a new thread, the executor thread ends up inheriting the active session of the driver.

In cluster mode, when the executor is a totally separate JVM, there is no problem.

@vanzin
Copy link
Contributor

vanzin commented Apr 30, 2018

Tests haven't triggered, weird. LGTM pending tests.

@vanzin
Copy link
Contributor

vanzin commented Apr 30, 2018

retest this please

@SparkQA
Copy link

SparkQA commented May 1, 2018

Test build #4160 has finished for PR 21185 at commit 49783c5.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @ericl too

@ericl
Copy link
Contributor

ericl commented May 1, 2018

This makes sense to me. It would be slightly to clear it where where the session is getting leaked through threads, but if that's hard then this looks good.

@squito
Copy link
Contributor Author

squito commented May 1, 2018

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented May 1, 2018

Test build #4161 has started for PR 21185 at commit 49783c5.

@gatorsmile
Copy link
Member

LGTM pending Jenkins

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. This should resolve the flakiness of BucketedWriteWithoutHiveSupportSuite, though we shall still try to find the place where the Thread is actually leaked.

@jiangxb1987
Copy link
Contributor

retest this please

@gatorsmile
Copy link
Member

ok to test

@gatorsmile
Copy link
Member

test this please

@squito
Copy link
Contributor Author

squito commented May 1, 2018

I don't understand the comments about "leaked threads". the executor thread pool is allowed to create threads whenever it wants. You can play around with this example if you like: https://gist.github.com/squito/8fc6533b1eeeb48559302c5898ae2c1d

The only "leak" here is potentially using an InheritableThreadLocal for the activeSession at all.
SparkSession.getActiveSession is a public api so we probably don't want to change the thread-inheritance behavior that it currently has. You could change SparkSession.getActiveSession to instead do the same thing itself, walking up the parent threads, but stopping if it hits an executor thread. But, seems more complex for no gain.

@SparkQA
Copy link

SparkQA commented May 1, 2018

Test build #89978 has finished for PR 21185 at commit 49783c5.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jkbradley
Copy link
Member

jkbradley commented May 1, 2018

There have been several of these R test failures. May be from flakiness with CRAN; testing locally now (since I didn't see any recent bad commits in R).

@cloud-fan
Copy link
Contributor

that's the whole problem. Its only meant to be available on the driver, but it ends up getting set on the executor when running in local mode.

Yea I think this is the root cause. I'm making a PR to ban SQLConf.get at executor side, shall we do the same thing for SparkSession? And fixes all the places that mistakenly access SparkSession in executor.

@squito
Copy link
Contributor Author

squito commented May 2, 2018

Yea I think this is the root cause. I'm making a PR to ban SQLConf.get at executor side, shall we do the same thing for SparkSession? And fixes all the places that mistakenly access SparkSession in executor.

ah that makes sense. All the failures I observed w/ the executor accessing the active session were actually via SQLConf.get. I don't know of any others.

I think it makes sense to actively ban SparkSession.getActiveSession the same way. I can include that in this PR, or make another one (or I guess you can put it into your PR), whatever you prefer. I don't think there will be any existing uses that need to be fixed, as I'm pretty sure if you did access it, you'd get the exception we've seen in tests related to the listener bus:

08:36:34.694 Executor task launch worker for task 436 ERROR Executor: Exception in task 0.0 in stage 402.0 (TID 436)
java.lang.IllegalStateException: LiveListenerBus is stopped.
        at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:97)
        at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:80)
        at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:93)
        at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:117)
...

@SparkQA
Copy link

SparkQA commented May 2, 2018

Test build #4169 has finished for PR 21185 at commit 49783c5.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented May 2, 2018

@jkbradley still seeing flakiness in R tests, in other PRs too. I'm not even sure how to interpret the failure. is it safe to ignore those? I can't see how this would be effecting R tests ...

* Only in local mode, we have to prevent the driver from setting the active SparkSession
* in the executor threads. See SPARK-23894.
*/
private lazy val clearActiveSparkSessionMethod = if (Utils.isLocalMaster(conf)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to call this if we can resolve the root cause in #21190, and issue an error promptly like what you said in #21185 (comment)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this check in #21190

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, I closed this PR

@squito
Copy link
Contributor Author

squito commented May 8, 2018

I'm closing this in favor of #21190

@squito squito closed this May 8, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants