Skip to content

Commit bc46dda

Browse files
author
Davies Liu
committed
thread safety
1 parent b4cd73e commit bc46dda

File tree

7 files changed

+38
-65
lines changed

7 files changed

+38
-65
lines changed

core/src/main/scala/org/apache/spark/SerializableWritable.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark
1919

2020
import java.io._
2121

22-
import org.apache.hadoop.conf.Configuration
2322
import org.apache.hadoop.io.ObjectWritable
2423
import org.apache.hadoop.io.Writable
24+
import org.apache.spark.deploy.SparkHadoopUtil
2525

2626
import org.apache.spark.annotation.DeveloperApi
2727

@@ -30,16 +30,18 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
3030
def value = t
3131
override def toString = t.toString
3232

33-
private def writeObject(out: ObjectOutputStream) {
33+
protected def writeObject(out: ObjectOutputStream) {
3434
out.defaultWriteObject()
3535
new ObjectWritable(t).write(out)
3636
}
3737

38-
private def readObject(in: ObjectInputStream) {
38+
protected def readObject(in: ObjectInputStream) {
3939
in.defaultReadObject()
4040
val ow = new ObjectWritable()
41-
ow.setConf(new Configuration())
42-
ow.readFields(in)
41+
SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
42+
ow.setConf(SparkHadoopUtil.newConfiguration())
43+
ow.readFields(in) // not thread safe
44+
}
4345
t = ow.get().asInstanceOf[T]
4446
}
4547
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ class SparkContext(config: SparkConf) extends Logging {
569569
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
570570
new HadoopRDD(
571571
this,
572-
new SerializableWritable(hadoopConfiguration),
572+
hadoopConfiguration,
573573
Some(setInputPathsFunc),
574574
inputFormatClass,
575575
keyClass,

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,20 @@ class SparkHadoopUtil extends Logging {
124124
}
125125

126126
object SparkHadoopUtil {
127+
/**
128+
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
129+
* Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
130+
*/
131+
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
132+
133+
/**
134+
* Create a new Configuration in thread-safe way
135+
*/
136+
def newConfiguration(): Configuration = {
137+
CONFIGURATION_INSTANTIATION_LOCK.synchronized {
138+
new Configuration()
139+
}
140+
}
127141

128142
private val hadoop = {
129143
val yarnMode = java.lang.Boolean.valueOf(

core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.fs.Path
2626

2727
import org.apache.spark._
28-
import org.apache.spark.broadcast.Broadcast
2928
import org.apache.spark.deploy.SparkHadoopUtil
3029

3130
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import org.apache.hadoop.util.ReflectionUtils
3939

4040
import org.apache.spark._
4141
import org.apache.spark.annotation.DeveloperApi
42-
import org.apache.spark.broadcast.Broadcast
4342
import org.apache.spark.deploy.SparkHadoopUtil
4443
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
4544
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
@@ -86,7 +85,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
8685
* [[org.apache.spark.SparkContext.hadoopRDD()]]
8786
*
8887
* @param sc The SparkContext to associate the RDD with.
89-
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
88+
* @param conf A general Hadoop Configuration, or a subclass of it. If the enclosed
9089
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
9190
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
9291
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
@@ -99,14 +98,17 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
9998
@DeveloperApi
10099
class HadoopRDD[K, V](
101100
sc: SparkContext,
102-
conf: SerializableWritable[Configuration],
101+
@transient conf: Configuration,
103102
initLocalJobConfFuncOpt: Option[JobConf => Unit],
104103
inputFormatClass: Class[_ <: InputFormat[K, V]],
105104
keyClass: Class[K],
106105
valueClass: Class[V],
107106
minPartitions: Int)
108107
extends RDD[(K, V)](sc, Nil) with Logging {
109108

109+
// The serializable configuration
110+
private val sConf = new SerializableWritable(conf)
111+
110112
def this(
111113
sc: SparkContext,
112114
conf: JobConf,
@@ -116,63 +118,27 @@ class HadoopRDD[K, V](
116118
minPartitions: Int) = {
117119
this(
118120
sc,
119-
new SerializableWritable(conf),
121+
conf,
120122
None /* initLocalJobConfFuncOpt */,
121123
inputFormatClass,
122124
keyClass,
123125
valueClass,
124126
minPartitions)
125127
}
126128

127-
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
128-
129129
protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
130130

131131
// used to build JobTracker ID
132132
private val createTime = new Date()
133133

134-
private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", "false").toBoolean
135-
136134
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
137-
protected def getJobConf(): JobConf = {
138-
val conf: Configuration = this.conf.value
139-
if (shouldCloneJobConf) {
140-
// Hadoop Configuration objects are not thread-safe, which may lead to various problems if
141-
// one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
142-
// somewhat rarely because most jobs treat the configuration as though it's immutable. One
143-
// solution, implemented here, is to clone the Configuration object. Unfortunately, this
144-
// clone can be very expensive. To avoid unexpected performance regressions for workloads and
145-
// Hadoop versions that do not suffer from these thread-safety issues, this cloning is
146-
// disabled by default.
147-
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
148-
logDebug("Cloning Hadoop Configuration")
149-
val newJobConf = new JobConf(conf)
150-
if (!conf.isInstanceOf[JobConf]) {
151-
initLocalJobConfFuncOpt.map(f => f(newJobConf))
152-
}
135+
protected def getJobConf(): JobConf = sConf.value match {
136+
case jobConf: JobConf => jobConf
137+
case c => SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK synchronized {
138+
val newJobConf = new JobConf(c)
139+
initLocalJobConfFuncOpt.map(f => f(newJobConf))
153140
newJobConf
154141
}
155-
} else {
156-
if (conf.isInstanceOf[JobConf]) {
157-
logDebug("Re-using user-broadcasted JobConf")
158-
conf.asInstanceOf[JobConf]
159-
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
160-
logDebug("Re-using cached JobConf")
161-
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
162-
} else {
163-
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
164-
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
165-
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
166-
// Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
167-
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
168-
logDebug("Creating new JobConf and caching it for later re-use")
169-
val newJobConf = new JobConf(conf)
170-
initLocalJobConfFuncOpt.map(f => f(newJobConf))
171-
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
172-
newJobConf
173-
}
174-
}
175-
}
176142
}
177143

178144
protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
@@ -295,12 +261,6 @@ class HadoopRDD[K, V](
295261
}
296262

297263
private[spark] object HadoopRDD extends Logging {
298-
/**
299-
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
300-
* Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
301-
*/
302-
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
303-
304264
/**
305265
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
306266
* the local process.

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ class NewHadoopRDD[K, V](
7474
with Logging {
7575

7676
private val sConf = new SerializableWritable(conf)
77-
// private val serializableConf = new SerializableWritable(conf)
7877

7978
private val jobTrackerId: String = {
8079
val formatter = new SimpleDateFormat("yyyyMMddHHmm")

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import org.apache.hadoop.io.Writable
3131
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
3232

3333
import org.apache.spark.SerializableWritable
34-
import org.apache.spark.broadcast.Broadcast
3534
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
3635
import org.apache.spark.sql.catalyst.expressions._
3736

@@ -64,7 +63,7 @@ class HadoopTableReader(
6463

6564
// TODO: set aws s3 credentials.
6665

67-
private val conf = new SerializableWritable(hiveExtraConf)
66+
private val conf: SerializableWritable[Configuration] = new SerializableWritable(hiveExtraConf)
6867

6968
override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] =
7069
makeRDDForTable(
@@ -157,7 +156,7 @@ class HadoopTableReader(
157156

158157
// Create local references so that the outer object isn't serialized.
159158
val tableDesc = relation.tableDesc
160-
val broadcastedHiveConf = _broadcastedHiveConf
159+
val _conf = conf
161160
val localDeserializer = partDeserializer
162161
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
163162

@@ -179,7 +178,7 @@ class HadoopTableReader(
179178
fillPartitionKeys(partValues, mutableRow)
180179

181180
createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter =>
182-
val hconf = broadcastedHiveConf.value.value
181+
val hconf = _conf.value
183182
val deserializer = localDeserializer.newInstance()
184183
deserializer.initialize(hconf, partProps)
185184

@@ -211,7 +210,7 @@ class HadoopTableReader(
211210
}
212211

213212
/**
214-
* Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be
213+
* Creates a HadoopRDD based on the HiveConf and other job properties that will be
215214
* applied locally on each slave.
216215
*/
217216
private def createHadoopRdd(
@@ -223,7 +222,7 @@ class HadoopTableReader(
223222

224223
val rdd = new HadoopRDD(
225224
sc.sparkContext,
226-
_broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
225+
conf.value,
227226
Some(initializeJobConfFunc),
228227
inputFormatClass,
229228
classOf[Writable],

0 commit comments

Comments
 (0)