Skip to content

Commit e19a3bd

Browse files
committed
avoid speculative tasks write same file
1 parent 4c5889e commit e19a3bd

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private[hive] class SparkHiveWriterContainer(
6868
@transient private var writer: FileSinkOperator.RecordWriter = null
6969
@transient protected lazy val committer = conf.value.getOutputCommitter
7070
@transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
71-
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
71+
@transient protected lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
7272
@transient private lazy val outputFormat =
7373
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
7474

@@ -230,7 +230,15 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
230230
val path = {
231231
val outputPath = FileOutputFormat.getOutputPath(conf.value)
232232
assert(outputPath != null, "Undefined job output-path")
233-
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
233+
var workPath = outputPath
234+
if(committer.isInstanceOf[FileOutputCommitter]) {
235+
// use the path like $outputPath/_temporary/${attemptId}/
236+
// to avoid write to the same file when `spark.speculation=true`
237+
workPath = committer
238+
.asInstanceOf[FileOutputCommitter]
239+
.getWorkPath(taskContext, outputPath)
240+
}
241+
workPath = new Path(workPath, dynamicPartPath.stripPrefix("/"))
234242
new Path(workPath, getOutputName)
235243
}
236244

0 commit comments

Comments
 (0)