Skip to content

Commit 40bce63

Browse files
jacek-lewandowskiJoshRosen
authored andcommitted
SPARK-5425: Use synchronised methods in system properties to create SparkConf
SPARK-5425: Fixed usages of system properties This patch fixes few problems caused by the fact that the Scala wrapper over system properties is not thread-safe and is basically invalid because it doesn't take into account the default values which could have been set in the properties object. The problem is fixed by modifying `Utils.getSystemProperties` method so that it uses `stringPropertyNames` method of the `Properties` class, which is thread-safe (internally it creates a defensive copy in a synchronized method) and returns keys of the properties which were set explicitly and which are defined as defaults. The other related problem, which is fixed here. was in `ResetSystemProperties` mix-in. It created a copy of the system properties in the wrong way. This patch also introduces a test case for thread-safeness of SparkConf creation. Refer to the discussion in #4220 for more details. Author: Jacek Lewandowski <lewandowski.jacek@gmail.com> Closes #4220 from jacek-lewandowski/SPARK-5425-1.1 and squashes the following commits: 6c48a1f [Jacek Lewandowski] SPARK-5425: Modified Utils.getSystemProperties to return a map of all system properties - explicit + defaults 74b4489 [Jacek Lewandowski] SPARK-5425: Use SerializationUtils to save properties in ResetSystemProperties trait 685780e [Jacek Lewandowski] SPARK-5425: Use synchronised methods in system properties to create SparkConf
1 parent e19c70f commit 40bce63

File tree

5 files changed

+46
-8
lines changed

5 files changed

+46
-8
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
package org.apache.spark
1919

20-
import scala.collection.JavaConverters._
2120
import scala.collection.mutable.HashMap
2221

22+
import org.apache.spark.util.Utils
23+
2324
/**
2425
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
2526
*
@@ -49,8 +50,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
4950

5051
if (loadDefaults) {
5152
// Load any spark.* system properties
52-
for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
53-
settings(k) = v
53+
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
54+
settings(key) = value
5455
}
5556
}
5657

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,9 +1227,14 @@ private[spark] object Utils extends Logging {
12271227
hashAbs
12281228
}
12291229

1230-
/** Returns a copy of the system properties that is thread-safe to iterator over. */
1231-
def getSystemProperties(): Map[String, String] = {
1232-
System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String]
1230+
/** Returns the system properties map that is thread-safe to iterator over. It gets the
1231+
* properties which have been set explicitly, as well as those for which only a default value
1232+
* has been defined. */
1233+
def getSystemProperties: Map[String, String] = {
1234+
val sysProps = for (key <- System.getProperties.stringPropertyNames()) yield
1235+
(key, System.getProperty(key))
1236+
1237+
sysProps.toMap
12331238
}
12341239

12351240
/**

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.spark
1919

20+
import java.util.concurrent.{TimeUnit, Executors}
21+
22+
import scala.util.{Try, Random}
23+
2024
import org.scalatest.FunSuite
2125

2226
import org.apache.spark.util.ResetSystemProperties
@@ -121,4 +125,25 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
121125
assert(conf.get("spark.test.a.b") === "A.B")
122126
assert(conf.get("spark.test.a.b.c") === "a.b.c")
123127
}
128+
129+
test("Thread safeness - SPARK-5425") {
130+
import scala.collection.JavaConversions._
131+
val executor = Executors.newSingleThreadScheduledExecutor()
132+
val sf = executor.scheduleAtFixedRate(new Runnable {
133+
override def run(): Unit =
134+
System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString)
135+
}, 0, 1, TimeUnit.MILLISECONDS)
136+
137+
try {
138+
val t0 = System.currentTimeMillis()
139+
while ((System.currentTimeMillis() - t0) < 1000) {
140+
val conf = Try(new SparkConf(loadDefaults = true))
141+
assert(conf.isSuccess === true)
142+
}
143+
} finally {
144+
executor.shutdownNow()
145+
for (key <- System.getProperties.stringPropertyNames() if key.startsWith("spark.5425."))
146+
System.getProperties.remove(key)
147+
}
148+
}
124149
}

core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.util
1919

2020
import java.util.Properties
2121

22+
import org.apache.commons.lang3.SerializationUtils
2223
import org.scalatest.{BeforeAndAfterEach, Suite}
2324

2425
/**
@@ -42,7 +43,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su
4243
var oldProperties: Properties = null
4344

4445
override def beforeEach(): Unit = {
45-
oldProperties = new Properties(System.getProperties)
46+
// we need SerializationUtils.clone instead of `new Properties(System.getProperties()` because
47+
// the later way of creating a copy does not copy the properties but it initializes a new
48+
// Properties object with the given properties as defaults. They are not recognized at all
49+
// by standard Scala wrapper over Java Properties then.
50+
oldProperties = SerializationUtils.clone(System.getProperties)
4651
super.beforeEach()
4752
}
4853

examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.examples
1919

2020
import scala.collection.JavaConversions._
2121

22+
import org.apache.spark.util.Utils
23+
2224
/** Prints out environmental information, sleeps, and then exits. Made to
2325
* test driver submission in the standalone scheduler. */
2426
object DriverSubmissionTest {
@@ -30,7 +32,7 @@ object DriverSubmissionTest {
3032
val numSecondsToSleep = args(0).toInt
3133

3234
val env = System.getenv()
33-
val properties = System.getProperties()
35+
val properties = Utils.getSystemProperties
3436

3537
println("Environment variables containing SPARK_TEST:")
3638
env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)

0 commit comments

Comments
 (0)