Skip to content

[SPARK-3762] clear reference of SparkEnv after stop #2624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Oct 2, 2014

SparkEnv is cached in ThreadLocal object, so after stop and create a new SparkContext, old SparkEnv is still used by some threads, it will trigger many problems, for example, pyspark will have problem after restart SparkContext, because py4j use thread pool for RPC.

This patch will clear all the references after stop a SparkEnv.

cc @mateiz @tdas @pwendell

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have started for PR 2624 at commit 4d0ea8b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have finished for PR 2624 at commit 4d0ea8b.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21184/

@@ -130,6 +133,12 @@ object SparkEnv extends Logging {
env.set(e)
}

// clear all the threadlocal references
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about threads that have called SparkEnv.set(SparkEnv.get)? I don't think those ThreadLocals will get cleared.

@JoshRosen
Copy link
Contributor

It looks like SparkEnv has two "getter" methods: get, which returns either the ThreadLocal value or the last SparkEnv set by any thread, and getThreadLocal, which just reads the current ThreadLocal.

It turns out that there's no code which calls getThreadLocal. We call SparkEnv.set() in a few places, but we seem to always set it using either a new SparkEnv (e.g. when starting a SparkContext or Executor) or using the value obtained from SparkEnv.get(). Therefore, it seems like SparkEnv effectively acts as a global object. Why not just remove the ThreadLocals and have get just return some @volatile field in SparkEnv that holds the current instance (a sort of Singleton pattern)?

@davies
Copy link
Contributor Author

davies commented Oct 2, 2014

@JoshRosen I'd like to move in this way, but getThreadLocal() is a public API, I'm afraid to remove it.

In the past, I remembered that we can have two different SparkEnv in the same JVM, a different SparkEnv for executor to hold different TrackerClients. Maybe it's not needed anymore.

Also we can keep the getThreadLocal() and deprecate it, remove the ThreadLocal object, how is it?

@JoshRosen
Copy link
Contributor

@davies The SparkEnv class is marked @developerAPI and has this note in its Scaladoc:

 * NOTE: This is not intended for external use. This is exposed for Shark and may be made private
 *       in a future release.

Nearly every object that's accessible through SparkEnv's public fields is private[spark]. I'd be very surprised if there was third-party code that was relying on SparkEnv remaining backwards-compatible.

@JoshRosen
Copy link
Contributor

@davies:

In the past, I remembered that we can have two different SparkEnv in the same JVM, a different SparkEnv for executor to hold different TrackerClients. Maybe it's not needed anymore.

SparkEnv.create is only called in two places, SparkContext and Executor's constructors.

In Executor, we only create a new SparkContext if we're not running in local mode:

  // Initialize Spark environment (using system properties read above)
  private val env = {
    if (!isLocal) {
      val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
        isDriver = false, isLocal = false)
      SparkEnv.set(_env)
      _env.metricsSystem.registerSource(executorSource)
      _env
    } else {
      SparkEnv.get
    }
  }

Since we only have one Executor instance per JVM, we only have one SparkEnv per JVM (since we don't currently support concurrently-running SparkContexts in the same JVM).

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have started for PR 2624 at commit ee62bb7.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor

We should probably update the docstrings to remove all references to ThreadLocal, too.

@SparkQA
Copy link

SparkQA commented Oct 3, 2014

QA tests have finished for PR 2624 at commit ee62bb7.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21232/

@JoshRosen
Copy link
Contributor

Hmm, strange test failure:

[info] - block generator throttling *** FAILED ***
[info]   org.scalatest.exceptions.TestFailedException was thrown. (NetworkReceiverSuite.scala:180)

This is a known flaky test, though, so let's try running the tests again.

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Oct 3, 2014

QA tests have started for PR 2624 at commit ba77ca4.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 3, 2014

QA tests have finished for PR 2624 at commit ba77ca4.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected case class Keyword(str: String)

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21237/

@tdas
Copy link
Contributor

tdas commented Oct 3, 2014

Okay, this is a pretty significant change to remove the threadlocal object completely. There are two things we can do

  • Either we can do something minimal to just clear the reference, so that repeated sparkContext creation works from pySpark.
  • Or we do this scary refactoring and eliminate the threadlocal completely. I know that there are probably no usecases where the threadlocal object on any thread would be different from that of the global lastSparkEnv, but are we entirely sure?

I think we need more input from others @pwendell @mateiz @rxin

@JoshRosen
Copy link
Contributor

Either we can do something minimal to just clear the reference, so that repeated sparkContext creation works from pySpark.

I'm not sure that there's an easy, minimal approach that's also correct, though. The problem is that some threads obtain a SparkEnv by calling SparkEnv.get, so these threads are prone to reading old ThreadLocals that haven't been cleaned up. In order for an approach that clears ThreadLocals to be safe, I think we'd need some way to ensure that any thread that sets the ThreadLocal eventually clears that ThreadLocal before it's re-used. I suppose that we could audit all calls of SparkEnv.set() and add the equivalent of a try ... finally to ensure that the ThreadLocal is eventually cleared. This is starting to get complex, though, and I'm not sure that it's simpler than simply removing the ThreadLocals for now.

@JoshRosen
Copy link
Contributor

It's worth noting that the ThreadLocals haven't seemed to cause problems in any of the existing uses of Spark / PySpark. In PySpark Streaming, I think we're running into a scenario that's something like this:

  • Java invokes a Python callback through the Py4J callback server. Internally, the callback server uses some thread pool.
  • The Python callback calls back into Java through Py4J.
  • Somewhere along the line, SparkEnv.set() is called, leaking the current SparkEnv into one of the Py4J GatewayServer or CallbackServer pool threads.
  • This thread is re-used when a new Python SparkContext is created using the same GatewayServer.

I thought of another fix that will allow the ThreadLocals to work: add a mutable field to SparkEnv instances that records whether that environment is associated with a SparkContext that's been stopped. In SparkEnv.get(), we can check this field to determine whether to return the ThreadLocal or return lastSparkEnv. This approach is more confusing / complex than removing the ThreadLocals, though.

I'm still strongly in favor of doing the work to confirm that SparkEnv is currently used as though it's a global object and then removing the ThreadLocals.

@tdas
Copy link
Contributor

tdas commented Oct 4, 2014

@JoshRosen the simple fix is to delete the threadlocal variable completely. Then any access to the threadlocal variable from any thread (even threadpool in Py4J) is going to be reset.

What you said is definitely the structured solution and I am not opposed to it. However, that is a bigger endeavor and i dont want the pyspark streaming patched to be blocked for that. The simple intermediate fix is safer and correct (though, not the ideal, I agree) as it does not change any code paths that can be accessed while a sparkcontext is active.

/**
* Returns the ThreadLocal SparkEnv.
*/
def getThreadLocal: SparkEnv = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should leave this in instead of removing it, because some user code might be calling this public method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Even though it's a DeveloperApi we shouldn't break it if we can help it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Let's throw @deprecated on it when we put it back, though.

@JoshRosen
Copy link
Contributor

@mateiz Aside from restoring the getThreadLocal method in order to preserve API compatibility, is this patch otherwise ready to merge?

@davies
Copy link
Contributor Author

davies commented Oct 7, 2014

@mateiz @JoshRosen I had put getThreadLocal() back and deprecated it.

@SparkQA
Copy link

SparkQA commented Oct 7, 2014

QA tests have started for PR 2624 at commit a69f30c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 7, 2014

QA tests have finished for PR 2624 at commit a69f30c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21368/Test PASSed.

@mateiz
Copy link
Contributor

mateiz commented Oct 7, 2014

Yup, looks good! I'm going to merge it.

@asfgit asfgit closed this in 6550329 Oct 7, 2014
kayousterhout pushed a commit to kayousterhout/spark-1 that referenced this pull request Dec 2, 2014
This commit removes the SparkEnv thread local variable, and instead
makes SparkEnv a single global variable (which is how SparkEnv has
always been used).  Ths significantly simplifies how SparkEnv needs to
be passed around for monotasks.

This is a cherry-picked version of Apache Spark commit
6550329 to simplify handling
of SparkEnv.

Author: Davies Liu <davies.liu@gmail.com>

Closes apache#2624 from davies/env and squashes the following commits:

a69f30c [Davies Liu] deprecate getThreadLocal
ba77ca4 [Davies Liu] remove getThreadLocal(), update docs
ee62bb7 [Davies Liu] cleanup ThreadLocal of SparnENV
4d0ea8b [Davies Liu] clear reference of SparkEnv after stop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants