Skip to content

Commit b62ef8f

Browse files
HeartSaVioRMarcelo Vanzin
authored andcommitted
[SPARK-29007][STREAMING][MLLIB][TESTS] Enforce not leaking SparkContext in tests which creates new StreamingContext with new SparkContext
### What changes were proposed in this pull request? This patch enforces tests to prevent leaking newly created SparkContext while is created via initializing StreamingContext. Leaking SparkContext in test would make most of following tests being failed as well, so this patch applies defensive programming, trying its best to ensure SparkContext is cleaned up. ### Why are the changes needed? We got some case in CI build where SparkContext is being leaked and other tests are affected by leaked SparkContext. Ideally we should isolate the environment among tests if possible. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Modified UTs. Closes #25709 from HeartSaVioR/SPARK-29007. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 2736efa commit b62ef8f

File tree

21 files changed

+240
-241
lines changed

21 files changed

+240
-241
lines changed

external/kafka-0-10/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@
4545
<version>${project.version}</version>
4646
<scope>provided</scope>
4747
</dependency>
48+
<dependency>
49+
<groupId>org.apache.spark</groupId>
50+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
51+
<version>${project.version}</version>
52+
<type>test-jar</type>
53+
<scope>test</scope>
54+
</dependency>
4855
<dependency>
4956
<groupId>org.apache.spark</groupId>
5057
<artifactId>spark-core_${scala.binary.version}</artifactId>

external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.spark.streaming.kafka010
1919

2020
import java.io.File
21-
import java.lang.{ Long => JLong }
22-
import java.util.{ Arrays, HashMap => JHashMap, Map => JMap, UUID }
21+
import java.lang.{Long => JLong}
22+
import java.util.{Arrays, HashMap => JHashMap, Map => JMap, UUID}
2323
import java.util.concurrent.ConcurrentHashMap
2424
import java.util.concurrent.ConcurrentLinkedQueue
2525
import java.util.concurrent.atomic.AtomicLong
@@ -31,22 +31,20 @@ import scala.util.Random
3131
import org.apache.kafka.clients.consumer._
3232
import org.apache.kafka.common.TopicPartition
3333
import org.apache.kafka.common.serialization.StringDeserializer
34-
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
3534
import org.scalatest.concurrent.Eventually
3635

3736
import org.apache.spark.{SparkConf, SparkFunSuite}
3837
import org.apache.spark.internal.Logging
3938
import org.apache.spark.rdd.RDD
40-
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
39+
import org.apache.spark.streaming.{LocalStreamingContext, Milliseconds, StreamingContext, Time}
4140
import org.apache.spark.streaming.dstream.DStream
4241
import org.apache.spark.streaming.scheduler._
4342
import org.apache.spark.streaming.scheduler.rate.RateEstimator
4443
import org.apache.spark.util.Utils
4544

4645
class DirectKafkaStreamSuite
4746
extends SparkFunSuite
48-
with BeforeAndAfter
49-
with BeforeAndAfterAll
47+
with LocalStreamingContext
5048
with Eventually
5149
with Logging {
5250
val sparkConf = new SparkConf()
@@ -56,7 +54,6 @@ class DirectKafkaStreamSuite
5654
// Otherwise the poll timeout defaults to 2 minutes and causes test cases to run longer.
5755
.set("spark.streaming.kafka.consumer.poll.ms", "10000")
5856

59-
private var ssc: StreamingContext = _
6057
private var testDir: File = _
6158

6259
private var kafkaTestUtils: KafkaTestUtils = _
@@ -78,12 +75,13 @@ class DirectKafkaStreamSuite
7875
}
7976
}
8077

81-
after {
82-
if (ssc != null) {
83-
ssc.stop(stopSparkContext = true)
84-
}
85-
if (testDir != null) {
86-
Utils.deleteRecursively(testDir)
78+
override def afterEach(): Unit = {
79+
try {
80+
if (testDir != null) {
81+
Utils.deleteRecursively(testDir)
82+
}
83+
} finally {
84+
super.afterEach()
8785
}
8886
}
8987

external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkConf, SparkContext}
3030
import org.apache.spark.network.util.JavaUtils
3131
import org.apache.spark.rdd.RDD
3232
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
33-
import org.apache.spark.streaming._
33+
import org.apache.spark.streaming.{LocalStreamingContext, _}
3434
import org.apache.spark.streaming.dstream.ReceiverInputDStream
3535
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
3636
import org.apache.spark.streaming.kinesis.KinesisReadConfigurations._
@@ -40,7 +40,7 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
4040
import org.apache.spark.util.Utils
4141

4242
abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite
43-
with Eventually with BeforeAndAfter with BeforeAndAfterAll {
43+
with LocalStreamingContext with Eventually with BeforeAndAfter with BeforeAndAfterAll {
4444

4545
// This is the name that KCL will use to save metadata to DynamoDB
4646
private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
@@ -53,15 +53,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
5353
private val dummyAWSSecretKey = "dummySecretKey"
5454

5555
private var testUtils: KinesisTestUtils = null
56-
private var ssc: StreamingContext = null
5756
private var sc: SparkContext = null
5857

5958
override def beforeAll(): Unit = {
60-
val conf = new SparkConf()
61-
.setMaster("local[4]")
62-
.setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
63-
sc = new SparkContext(conf)
64-
6559
runIfTestsEnabled("Prepare KinesisTestUtils") {
6660
testUtils = new KPLBasedKinesisTestUtils()
6761
testUtils.createStream()
@@ -70,12 +64,6 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
7064

7165
override def afterAll(): Unit = {
7266
try {
73-
if (ssc != null) {
74-
ssc.stop()
75-
}
76-
if (sc != null) {
77-
sc.stop()
78-
}
7967
if (testUtils != null) {
8068
// Delete the Kinesis stream as well as the DynamoDB table generated by
8169
// Kinesis Client Library when consuming the stream
@@ -87,17 +75,22 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
8775
}
8876
}
8977

90-
before {
78+
override def beforeEach(): Unit = {
79+
super.beforeEach()
80+
val conf = new SparkConf()
81+
.setMaster("local[4]")
82+
.setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
83+
sc = new SparkContext(conf)
9184
ssc = new StreamingContext(sc, batchDuration)
9285
}
9386

94-
after {
95-
if (ssc != null) {
96-
ssc.stop(stopSparkContext = false)
97-
ssc = null
98-
}
99-
if (testUtils != null) {
100-
testUtils.deleteDynamoDBTable(appName)
87+
override def afterEach(): Unit = {
88+
try {
89+
if (testUtils != null) {
90+
testUtils.deleteDynamoDBTable(appName)
91+
}
92+
} finally {
93+
super.afterEach()
10194
}
10295
}
10396

mllib/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@
5555
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
5656
<version>${project.version}</version>
5757
</dependency>
58+
<dependency>
59+
<groupId>org.apache.spark</groupId>
60+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
61+
<version>${project.version}</version>
62+
<type>test-jar</type>
63+
<scope>test</scope>
64+
</dependency>
5865
<dependency>
5966
<groupId>org.apache.spark</groupId>
6067
<artifactId>spark-sql_${scala.binary.version}</artifactId>

mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,17 @@ import org.apache.spark.SparkFunSuite
2323
import org.apache.spark.mllib.linalg.Vectors
2424
import org.apache.spark.mllib.regression.LabeledPoint
2525
import org.apache.spark.mllib.util.TestingUtils._
26-
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
26+
import org.apache.spark.streaming.{LocalStreamingContext, TestSuiteBase}
2727
import org.apache.spark.streaming.dstream.DStream
2828

29-
class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase {
29+
class StreamingLogisticRegressionSuite
30+
extends SparkFunSuite
31+
with LocalStreamingContext
32+
with TestSuiteBase {
3033

3134
// use longer wait time to ensure job completion
3235
override def maxWaitTimeMillis: Int = 30000
3336

34-
var ssc: StreamingContext = _
35-
36-
override def afterFunction() {
37-
super.afterFunction()
38-
if (ssc != null) {
39-
ssc.stop()
40-
}
41-
}
42-
4337
// Test if we can accurately learn B for Y = logistic(BX) on streaming data
4438
test("parameter accuracy") {
4539

mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,14 @@ package org.apache.spark.mllib.clustering
2020
import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.mllib.linalg.{Vector, Vectors}
2222
import org.apache.spark.mllib.util.TestingUtils._
23-
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
23+
import org.apache.spark.streaming.{LocalStreamingContext, TestSuiteBase}
2424
import org.apache.spark.streaming.dstream.DStream
2525
import org.apache.spark.util.random.XORShiftRandom
2626

27-
class StreamingKMeansSuite extends SparkFunSuite with TestSuiteBase {
27+
class StreamingKMeansSuite extends SparkFunSuite with LocalStreamingContext with TestSuiteBase {
2828

2929
override def maxWaitTimeMillis: Int = 30000
3030

31-
var ssc: StreamingContext = _
32-
33-
override def afterFunction() {
34-
super.afterFunction()
35-
if (ssc != null) {
36-
ssc.stop()
37-
}
38-
}
39-
4031
test("accuracy for single center and equivalence to grand average") {
4132
// set parameters
4233
val numBatches = 10

mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,17 @@ import scala.collection.mutable.ArrayBuffer
2222
import org.apache.spark.SparkFunSuite
2323
import org.apache.spark.mllib.linalg.Vectors
2424
import org.apache.spark.mllib.util.LinearDataGenerator
25-
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
25+
import org.apache.spark.streaming.{LocalStreamingContext, TestSuiteBase}
2626
import org.apache.spark.streaming.dstream.DStream
2727

28-
class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase {
28+
class StreamingLinearRegressionSuite
29+
extends SparkFunSuite
30+
with LocalStreamingContext
31+
with TestSuiteBase {
2932

3033
// use longer wait time to ensure job completion
3134
override def maxWaitTimeMillis: Int = 20000
3235

33-
var ssc: StreamingContext = _
34-
35-
override def afterFunction() {
36-
super.afterFunction()
37-
if (ssc != null) {
38-
ssc.stop()
39-
}
40-
}
41-
4236
// Assert that two values are equal within tolerance epsilon
4337
def assertEqual(v1: Double, v2: Double, epsilon: Double) {
4438
def errorMessage = v1.toString + " did not equal " + v2.toString

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ import org.apache.spark.internal.config._
3939
import org.apache.spark.rdd.RDD
4040
import org.apache.spark.streaming.dstream._
4141
import org.apache.spark.streaming.scheduler._
42-
import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, ResetSystemProperties,
43-
Utils}
42+
import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, ResetSystemProperties, Utils}
4443

4544
/**
4645
* A input stream that records the times of restore() invoked
@@ -206,24 +205,21 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
206205
* the checkpointing of a DStream's RDDs as well as the checkpointing of
207206
* the whole DStream graph.
208207
*/
209-
class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
208+
class CheckpointSuite extends TestSuiteBase with LocalStreamingContext with DStreamCheckpointTester
210209
with ResetSystemProperties {
211210

212-
var ssc: StreamingContext = null
213-
214211
override def batchDuration: Duration = Milliseconds(500)
215212

216-
override def beforeFunction() {
217-
super.beforeFunction()
213+
override def beforeEach(): Unit = {
214+
super.beforeEach()
218215
Utils.deleteRecursively(new File(checkpointDir))
219216
}
220217

221-
override def afterFunction() {
218+
override def afterEach(): Unit = {
222219
try {
223-
if (ssc != null) { ssc.stop() }
224220
Utils.deleteRecursively(new File(checkpointDir))
225221
} finally {
226-
super.afterFunction()
222+
super.afterEach()
227223
}
228224
}
229225

@@ -255,7 +251,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
255251
.checkpoint(stateStreamCheckpointInterval)
256252
.map(t => (t._1, t._2))
257253
}
258-
var ssc = setupStreams(input, operation)
254+
ssc = setupStreams(input, operation)
259255
var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
260256

261257
def waitForCompletionOfBatch(numBatches: Long): Unit = {

streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,14 @@ import org.apache.spark.util.ReturnStatementInClosureException
2929
/**
3030
* Test that closures passed to DStream operations are actually cleaned.
3131
*/
32-
class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
33-
private var ssc: StreamingContext = null
32+
class DStreamClosureSuite extends SparkFunSuite with LocalStreamingContext with BeforeAndAfterAll {
33+
override protected def beforeEach(): Unit = {
34+
super.beforeEach()
3435

35-
override def beforeAll(): Unit = {
36-
super.beforeAll()
3736
val sc = new SparkContext("local", "test")
3837
ssc = new StreamingContext(sc, Seconds(1))
3938
}
4039

41-
override def afterAll(): Unit = {
42-
try {
43-
ssc.stop(stopSparkContext = true)
44-
ssc = null
45-
} finally {
46-
super.afterAll()
47-
}
48-
}
49-
5040
test("user provided closures are actually cleaned") {
5141
val dstream = new DummyInputDStream(ssc)
5242
val pairDstream = dstream.map { i => (i, i) }

streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,28 +30,29 @@ import org.apache.spark.util.ManualClock
3030
/**
3131
* Tests whether scope information is passed from DStream operations to RDDs correctly.
3232
*/
33-
class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterAll {
34-
private var ssc: StreamingContext = null
35-
private val batchDuration: Duration = Seconds(1)
33+
class DStreamScopeSuite
34+
extends SparkFunSuite
35+
with LocalStreamingContext {
36+
37+
override def beforeEach(): Unit = {
38+
super.beforeEach()
3639

37-
override def beforeAll(): Unit = {
38-
super.beforeAll()
3940
val conf = new SparkConf().setMaster("local").setAppName("test")
4041
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
42+
val batchDuration: Duration = Seconds(1)
4143
ssc = new StreamingContext(new SparkContext(conf), batchDuration)
44+
45+
assertPropertiesNotSet()
4246
}
4347

44-
override def afterAll(): Unit = {
48+
override def afterEach(): Unit = {
4549
try {
46-
ssc.stop(stopSparkContext = true)
50+
assertPropertiesNotSet()
4751
} finally {
48-
super.afterAll()
52+
super.afterEach()
4953
}
5054
}
5155

52-
before { assertPropertiesNotSet() }
53-
after { assertPropertiesNotSet() }
54-
5556
test("dstream without scope") {
5657
val dummyStream = new DummyDStream(ssc)
5758
dummyStream.initialize(Time(0))

0 commit comments

Comments
 (0)