Skip to content

Commit b3adb53

Browse files
ericlgatorsmile
authored andcommitted
[SPARK-23971][BACKPORT-2.3] Should not leak Spark sessions across test suites
This PR is to backport the PR #21058 to Apache 2.3. This should be the cause why we saw the test regressions in Apache 2.3 branches: https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-2.3-test-sbt-hadoop-2.6/317/testReport/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/SPARK_15678__not_use_cache_on_overwrite/history/ https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-2.3-test-sbt-hadoop-2.7/318/testReport/junit/org.apache.spark.sql/DataFrameSuite/inputFiles/history/ --- ## What changes were proposed in this pull request? Many suites currently leak Spark sessions (sometimes with stopped SparkContexts) via the thread-local active Spark session and default Spark session. We should attempt to clean these up and detect when this happens to improve the reproducibility of tests. ## How was this patch tested? Existing tests Author: Eric Liang <ekl@databricks.com> Closes #21197 from gatorsmile/backportSPARK-23971.
1 parent 88abf7b commit b3adb53

File tree

4 files changed

+47
-9
lines changed

4 files changed

+47
-9
lines changed

mllib/src/test/java/org/apache/spark/SharedSparkSession.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,12 @@ public void setUp() throws IOException {
4242

4343
@After
4444
public void tearDown() {
45-
spark.stop();
46-
spark = null;
45+
try {
46+
spark.stop();
47+
spark = null;
48+
} finally {
49+
SparkSession.clearDefaultSession();
50+
SparkSession.clearActiveSession();
51+
}
4752
}
4853
}

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.sql.sources.BaseRelation
4444
import org.apache.spark.sql.streaming._
4545
import org.apache.spark.sql.types.{DataType, StructType}
4646
import org.apache.spark.sql.util.ExecutionListenerManager
47-
import org.apache.spark.util.Utils
47+
import org.apache.spark.util.{CallSite, Utils}
4848

4949

5050
/**
@@ -81,6 +81,9 @@ class SparkSession private(
8181
@transient private[sql] val extensions: SparkSessionExtensions)
8282
extends Serializable with Closeable with Logging { self =>
8383

84+
// The call site where this SparkSession was constructed.
85+
private val creationSite: CallSite = Utils.getCallSite()
86+
8487
private[sql] def this(sc: SparkContext) {
8588
this(sc, None, None, new SparkSessionExtensions)
8689
}
@@ -763,7 +766,7 @@ class SparkSession private(
763766

764767

765768
@InterfaceStability.Stable
766-
object SparkSession {
769+
object SparkSession extends Logging {
767770

768771
/**
769772
* Builder for [[SparkSession]].
@@ -1079,4 +1082,20 @@ object SparkSession {
10791082
}
10801083
}
10811084

1085+
private[spark] def cleanupAnyExistingSession(): Unit = {
1086+
val session = getActiveSession.orElse(getDefaultSession)
1087+
if (session.isDefined) {
1088+
logWarning(
1089+
s"""An existing Spark session exists as the active or default session.
1090+
|This probably means another suite leaked it. Attempting to stop it before continuing.
1091+
|This existing Spark session was created at:
1092+
|
1093+
|${session.get.creationSite.longForm}
1094+
|
1095+
""".stripMargin)
1096+
session.get.stop()
1097+
SparkSession.clearActiveSession()
1098+
SparkSession.clearDefaultSession()
1099+
}
1100+
}
10821101
}

sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class SessionStateSuite extends SparkFunSuite {
4343
if (activeSession != null) {
4444
activeSession.stop()
4545
activeSession = null
46+
SparkSession.clearActiveSession()
47+
SparkSession.clearDefaultSession()
4648
}
4749
super.afterAll()
4850
}

sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ trait SharedSparkSession
6060
protected implicit def sqlContext: SQLContext = _spark.sqlContext
6161

6262
protected def createSparkSession: TestSparkSession = {
63+
SparkSession.cleanupAnyExistingSession()
6364
new TestSparkSession(sparkConf)
6465
}
6566

@@ -92,11 +93,22 @@ trait SharedSparkSession
9293
* Stop the underlying [[org.apache.spark.SparkContext]], if any.
9394
*/
9495
protected override def afterAll(): Unit = {
95-
super.afterAll()
96-
if (_spark != null) {
97-
_spark.sessionState.catalog.reset()
98-
_spark.stop()
99-
_spark = null
96+
try {
97+
super.afterAll()
98+
} finally {
99+
try {
100+
if (_spark != null) {
101+
try {
102+
_spark.sessionState.catalog.reset()
103+
} finally {
104+
_spark.stop()
105+
_spark = null
106+
}
107+
}
108+
} finally {
109+
SparkSession.clearActiveSession()
110+
SparkSession.clearDefaultSession()
111+
}
100112
}
101113
}
102114

0 commit comments

Comments
 (0)