Skip to content

Commit 1f42295

Browse files
tdaszsxwing
authored andcommitted
[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles
The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places: * The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched * The JobConf is serialized as part of the DStream checkpoints. These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object. The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf. Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #10088 from tdas/SPARK-12087. (cherry picked from commit 8a75a30) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
1 parent a5743af commit 1f42295

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
730730
val serializableConf = new SerializableJobConf(conf)
731731
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
732732
val file = rddToFileName(prefix, suffix, time)
733-
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
733+
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
734+
new JobConf(serializableConf.value))
734735
}
735736
self.foreachRDD(saveFunc)
736737
}

0 commit comments

Comments
 (0)