Skip to content

Commit 43544c4

Browse files
committed
Do not use a custom output commiter when appendiing data.
1 parent 0f92be5 commit 43544c4

File tree

2 files changed

+136
-36
lines changed

2 files changed

+136
-36
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 54 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
9696
val fs = outputPath.getFileSystem(hadoopConf)
9797
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
9898

99-
val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
99+
val pathExists = fs.exists(qualifiedOutputPath)
100+
val doInsertion = (mode, pathExists) match {
100101
case (SaveMode.ErrorIfExists, true) =>
101102
sys.error(s"path $qualifiedOutputPath already exists.")
102103
case (SaveMode.Overwrite, true) =>
@@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
107108
case (SaveMode.Ignore, exists) =>
108109
!exists
109110
}
111+
// If we are appending data to an existing dir.
112+
val isAppend = (pathExists) && (mode == SaveMode.Append)
110113

111114
if (doInsertion) {
112115
val job = new Job(hadoopConf)
@@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
130133

131134
val partitionColumns = relation.partitionColumns.fieldNames
132135
if (partitionColumns.isEmpty) {
133-
insert(new DefaultWriterContainer(relation, job), df)
136+
insert(new DefaultWriterContainer(relation, job, isAppend), df)
134137
} else {
135138
val writerContainer = new DynamicPartitionWriterContainer(
136-
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
139+
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
137140
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
138141
}
139142
}
@@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
277280

278281
private[sql] abstract class BaseWriterContainer(
279282
@transient val relation: HadoopFsRelation,
280-
@transient job: Job)
283+
@transient job: Job,
284+
isAppend: Boolean)
281285
extends SparkHadoopMapReduceUtil
282286
with Logging
283287
with Serializable {
@@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer(
356360
}
357361

358362
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
359-
val committerClass = context.getConfiguration.getClass(
360-
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
361-
362-
Option(committerClass).map { clazz =>
363-
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
364-
365-
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
366-
// has an associated output committer. To override this output committer,
367-
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
368-
// If a data source needs to override the output committer, it needs to set the
369-
// output committer in prepareForWrite method.
370-
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
371-
// The specified output committer is a FileOutputCommitter.
372-
// So, we will use the FileOutputCommitter-specified constructor.
373-
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
374-
ctor.newInstance(new Path(outputPath), context)
375-
} else {
376-
// The specified output committer is just a OutputCommitter.
377-
// So, we will use the no-argument constructor.
378-
val ctor = clazz.getDeclaredConstructor()
379-
ctor.newInstance()
363+
val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
364+
365+
if (isAppend) {
366+
// If we are appending data to an existing dir, we will only use the output committer
367+
// associated with the file output format since it is not safe to use a custom
368+
// committer for appending. For example, in S3, direct parquet output committer may
369+
// leave partial data in the destination dir when the the appending job fails.
370+
logInfo(
371+
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
372+
"for appending.")
373+
defaultOutputCommitter
374+
} else {
375+
val committerClass = context.getConfiguration.getClass(
376+
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
377+
378+
Option(committerClass).map { clazz =>
379+
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
380+
381+
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
382+
// has an associated output committer. To override this output committer,
383+
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
384+
// If a data source needs to override the output committer, it needs to set the
385+
// output committer in prepareForWrite method.
386+
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
387+
// The specified output committer is a FileOutputCommitter.
388+
// So, we will use the FileOutputCommitter-specified constructor.
389+
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
390+
ctor.newInstance(new Path(outputPath), context)
391+
} else {
392+
// The specified output committer is just a OutputCommitter.
393+
// So, we will use the no-argument constructor.
394+
val ctor = clazz.getDeclaredConstructor()
395+
ctor.newInstance()
396+
}
397+
}.getOrElse {
398+
// If output committer class is not set, we will use the one associated with the
399+
// file output format.
400+
logInfo(
401+
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
402+
defaultOutputCommitter
380403
}
381-
}.getOrElse {
382-
// If output committer class is not set, we will use the one associated with the
383-
// file output format.
384-
val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
385-
logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
386-
outputCommitter
387404
}
388405
}
389406

@@ -433,8 +450,9 @@ private[sql] abstract class BaseWriterContainer(
433450

434451
private[sql] class DefaultWriterContainer(
435452
@transient relation: HadoopFsRelation,
436-
@transient job: Job)
437-
extends BaseWriterContainer(relation, job) {
453+
@transient job: Job,
454+
isAppend: Boolean)
455+
extends BaseWriterContainer(relation, job, isAppend) {
438456

439457
@transient private var writer: OutputWriter = _
440458

@@ -473,8 +491,9 @@ private[sql] class DynamicPartitionWriterContainer(
473491
@transient relation: HadoopFsRelation,
474492
@transient job: Job,
475493
partitionColumns: Array[String],
476-
defaultPartitionName: String)
477-
extends BaseWriterContainer(relation, job) {
494+
defaultPartitionName: String,
495+
isAppend: Boolean)
496+
extends BaseWriterContainer(relation, job, isAppend) {
478497

479498
// All output writers are created on executor side.
480499
@transient protected var outputWriters: mutable.Map[String, OutputWriter] = _

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@
1717

1818
package org.apache.spark.sql.sources
1919

20+
import scala.collection.JavaConversions._
21+
2022
import java.io.File
2123

2224
import com.google.common.io.Files
25+
import org.apache.hadoop.conf.Configuration
2326
import org.apache.hadoop.fs.Path
27+
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
28+
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
29+
import org.apache.parquet.hadoop.ParquetOutputCommitter
2430

2531
import org.apache.spark.{SparkException, SparkFunSuite}
2632
import org.apache.spark.deploy.SparkHadoopUtil
@@ -476,7 +482,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
476482
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
477483
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf
478484
// later.
479-
test("SPARK-8406: Avoids name collision while writing Parquet files") {
485+
test("SPARK-8406: Avoids name collision while writing files") {
480486
withTempPath { dir =>
481487
val path = dir.getCanonicalPath
482488
sqlContext
@@ -497,6 +503,81 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
497503
}
498504
}
499505
}
506+
507+
test("SPARK-8578 specified custom output committer will not be used to append data") {
508+
val clonedConf = new Configuration(configuration)
509+
try {
510+
val df = sqlContext.range(1, 10).toDF("i")
511+
withTempPath { dir =>
512+
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
513+
configuration.set(
514+
SQLConf.OUTPUT_COMMITTER_CLASS.key,
515+
classOf[AlwaysFailOutputCommitter].getName)
516+
// Since Parquet has its own output committer setting, also set it
517+
// to AlwaysFailParquetOutputCommitter at here.
518+
configuration.set("spark.sql.parquet.output.committer.class",
519+
classOf[AlwaysFailParquetOutputCommitter].getName)
520+
// Because there data already exists,
521+
// this append should succeed because we will use the output committer associated
522+
// with file format and AlwaysFailOutputCommitter will not be used.
523+
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
524+
checkAnswer(
525+
sqlContext.read
526+
.format(dataSourceName)
527+
.option("dataSchema", df.schema.json)
528+
.load(dir.getCanonicalPath),
529+
df.unionAll(df))
530+
531+
// This will fail because AlwaysFailOutputCommitter is used when we do append.
532+
intercept[Exception] {
533+
df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath)
534+
}
535+
}
536+
withTempPath { dir =>
537+
configuration.set(
538+
SQLConf.OUTPUT_COMMITTER_CLASS.key,
539+
classOf[AlwaysFailOutputCommitter].getName)
540+
// Since Parquet has its own output committer setting, also set it
541+
// to AlwaysFailParquetOutputCommitter at here.
542+
configuration.set("spark.sql.parquet.output.committer.class",
543+
classOf[AlwaysFailParquetOutputCommitter].getName)
544+
// Because there is no existing data,
545+
// this append will fail because AlwaysFailOutputCommitter is used when we do append
546+
// and there is no existing data.
547+
intercept[Exception] {
548+
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
549+
}
550+
}
551+
} finally {
552+
// Hadoop 1 doesn't have `Configuration.unset`
553+
configuration.clear()
554+
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
555+
}
556+
}
557+
}
558+
559+
// This class is used to test SPARK-8578. We should not use any custom output committer when
560+
// we actually append data to an existing dir.
561+
class AlwaysFailOutputCommitter(
562+
outputPath: Path,
563+
context: TaskAttemptContext)
564+
extends FileOutputCommitter(outputPath, context) {
565+
566+
override def commitJob(context: JobContext): Unit = {
567+
sys.error("Intentional job commitment failure for testing purpose.")
568+
}
569+
}
570+
571+
// This class is used to test SPARK-8578. We should not use any custom output committer when
572+
// we actually append data to an existing dir.
573+
class AlwaysFailParquetOutputCommitter(
574+
outputPath: Path,
575+
context: TaskAttemptContext)
576+
extends ParquetOutputCommitter(outputPath, context) {
577+
578+
override def commitJob(context: JobContext): Unit = {
579+
sys.error("Intentional job commitment failure for testing purpose.")
580+
}
500581
}
501582

502583
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {

0 commit comments

Comments
 (0)