@@ -122,20 +122,54 @@ class HadoopRDD[K, V](
122122 minPartitions)
123123 }
124124
125+ protected val jobConfCacheKey = " rdd_%d_job_conf" .format(id)
126+
125127 protected val inputFormatCacheKey = " rdd_%d_input_format" .format(id)
126128
127129 // used to build JobTracker ID
128130 private val createTime = new Date ()
129131
132+ private val shouldCloneJobConf = sc.conf.get(" spark.hadoop.cloneConf" , " false" ).toBoolean
133+
130134 // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
131135 protected def getJobConf (): JobConf = {
132136 val conf : Configuration = broadcastedConf.value.value
133- HadoopRDD .CONFIGURATION_INSTANTIATION_LOCK .synchronized {
134- val newJobConf = new JobConf (conf)
135- if (! conf.isInstanceOf [JobConf ]) {
136- initLocalJobConfFuncOpt.map(f => f(newJobConf))
137+ if (shouldCloneJobConf) {
138+ // Hadoop Configuration objects are not thread-safe, which may lead to various problems if
139+ // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
140+ // somewhat rarely because most jobs treat the configuration as though it's immutable. One
141+ // solution, implemented here, is to clone the Configuration object. Unfortunately, this
142+ // clone can be very expensive. To avoid unexpected performance regressions for workloads and
143+ // Hadoop versions that do not suffer from these thread-safety issues, this cloning is
144+ // disabled by default.
145+ HadoopRDD .CONFIGURATION_INSTANTIATION_LOCK .synchronized {
146+ logDebug(" Cloning Hadoop Configuration" )
147+ val newJobConf = new JobConf (conf)
148+ if (! conf.isInstanceOf [JobConf ]) {
149+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
150+ }
151+ newJobConf
152+ }
153+ } else {
154+ if (conf.isInstanceOf [JobConf ]) {
155+ logDebug(" Re-using user-broadcasted JobConf" )
156+ conf.asInstanceOf [JobConf ]
157+ } else if (HadoopRDD .containsCachedMetadata(jobConfCacheKey)) {
158+ logDebug(" Re-using cached JobConf" )
159+ HadoopRDD .getCachedMetadata(jobConfCacheKey).asInstanceOf [JobConf ]
160+ } else {
161+ // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
162+ // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
163+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
164+ // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
165+ HadoopRDD .CONFIGURATION_INSTANTIATION_LOCK .synchronized {
166+ logDebug(" Creating new JobConf and caching it for later re-use" )
167+ val newJobConf = new JobConf (conf)
168+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
169+ HadoopRDD .putCachedMetadata(jobConfCacheKey, newJobConf)
170+ newJobConf
171+ }
137172 }
138- newJobConf
139173 }
140174 }
141175
0 commit comments