-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23894][CORE][SQL] Defensively clear ActiveSession in Executors #21185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Because SparkSession.getActiveSession uses an InheritableThreadLocal, the ThreadPool in executors might end up inheriting the SparkSession from a driver thread. This leads to test failures as executors should never have an active SparkSession. So in local mode, defensively clear the active session.
Test build #89939 has finished for PR 21185 at commit
|
* Only in local mode, we have to prevent the driver from setting the active SparkSession | ||
* in the executor threads. See SPARK-23894. | ||
*/ | ||
lazy val clearActiveSparkSessionMethod = if (Utils.isLocalMaster(conf)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
@@ -299,6 +316,9 @@ private[spark] class Executor( | |||
Thread.currentThread.setContextClassLoader(replClassLoader) | |||
val ser = env.closureSerializer.newInstance() | |||
logInfo(s"Running $taskName (TID $taskId)") | |||
// When running in local mode, we might end up with the active session from the driver set on | |||
// this thread, though we never should, so we defensively clear it. See SPARK-23894. | |||
clearActiveSparkSessionMethod.foreach(_.invoke(null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be done in the thread pool's thread factory instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadFactories only create threads, they don't start them, so you can't do it directly there, as I need this to run inside the thread.
I thought about trying to do something fancier, like having the thread pool also start the thread and submit one runnable to do this, or having threads get promoted to the correct thread pool after this ran, but I felt like it was too error-prone for minimal gain.
I think |
that's the whole problem. Its only meant to be available on the driver, but it ends up getting set on the executor when running in local mode. Because it uses an inheritable thread local, when the executor thread pool creates a new thread, the executor thread ends up inheriting the active session of the driver. In cluster mode, when the executor is a totally separate JVM, there is no problem. |
Tests haven't triggered, weird. LGTM pending tests. |
retest this please |
Test build #4160 has finished for PR 21185 at commit
|
cc @ericl too |
This makes sense to me. It would be slightly to clear it where where the session is getting leaked through threads, but if that's hard then this looks good. |
Jenkins, retest this please |
Test build #4161 has started for PR 21185 at commit |
LGTM pending Jenkins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. This should resolve the flakiness of BucketedWriteWithoutHiveSupportSuite
, though we shall still try to find the place where the Thread is actually leaked.
retest this please |
ok to test |
test this please |
I don't understand the comments about "leaked threads". the executor thread pool is allowed to create threads whenever it wants. You can play around with this example if you like: https://gist.github.com/squito/8fc6533b1eeeb48559302c5898ae2c1d The only "leak" here is potentially using an InheritableThreadLocal for the activeSession at all. |
Test build #89978 has finished for PR 21185 at commit
|
There have been several of these R test failures. May be from flakiness with CRAN; testing locally now (since I didn't see any recent bad commits in R). |
Yea I think this is the root cause. I'm making a PR to ban |
ah that makes sense. All the failures I observed w/ the executor accessing the active session were actually via I think it makes sense to actively ban
|
Test build #4169 has finished for PR 21185 at commit
|
@jkbradley still seeing flakiness in R tests, in other PRs too. I'm not even sure how to interpret the failure. is it safe to ignore those? I can't see how this would be effecting R tests ... |
* Only in local mode, we have to prevent the driver from setting the active SparkSession | ||
* in the executor threads. See SPARK-23894. | ||
*/ | ||
private lazy val clearActiveSparkSessionMethod = if (Utils.isLocalMaster(conf)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need to call this if we can resolve the root cause in #21190, and issue an error promptly like what you said in #21185 (comment)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this check in #21190
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, I closed this PR
I'm closing this in favor of #21190 |
Because SparkSession.getActiveSession uses an InheritableThreadLocal,
the ThreadPool in executors might end up inheriting the SparkSession
from a driver thread. This leads to test failures as executors should
never have an active SparkSession. So in local mode, defensively clear
the active session.