Skip to content

Commit

Permalink
[SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation job initializatio…
Browse files Browse the repository at this point in the history
…n for Hadoop 1.x

For Hadoop 1.x, `TaskAttemptContext` constructor clones the `Configuration` argument, thus configurations done in `HadoopFsRelation.prepareForWriteJob()` are not populated to *driver* side `TaskAttemptContext` (executor side configurations are properly populated). Currently this should only affect Parquet output committer class configuration.

Author: Cheng Lian <lian@databricks.com>

Closes apache#6669 from liancheng/spark-8121 and squashes the following commits:

73819e8 [Cheng Lian] Minor logging fix
fce089c [Cheng Lian] Adds more logging
b6f78a6 [Cheng Lian] Fixes compilation error introduced while rebasing
963a1aa [Cheng Lian] Addresses @yhuai's comment
c3a0b1a [Cheng Lian] Fixes InsertIntoHadoopFsRelation job initialization
  • Loading branch information
liancheng authored and yhuai committed Jun 8, 2015
1 parent ed5c2dc commit bbdfc0a
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 13 deletions.
1 change: 1 addition & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ private[spark] object SQLConf {

// The output committer class used by FSBasedRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
// NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"

// Whether to perform eager analysis when constructing a dataframe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ private[sql] class ParquetRelation2(
classOf[ParquetOutputCommitter],
classOf[ParquetOutputCommitter])

if (conf.get("spark.sql.parquet.output.committer.class") == null) {
logInfo("Using default output committer for Parquet: " +
classOf[ParquetOutputCommitter].getCanonicalName)
} else {
logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
}

conf.setClass(
SQLConf.OUTPUT_COMMITTER_CLASS,
committerClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,16 @@ private[sql] abstract class BaseWriterContainer(
def driverSideSetup(): Unit = {
setupIDs(0, 0, 0)
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)

// This preparation must happen before initializing output format and output committer, since
// their initialization involves the job configuration, which can be potentially decorated in
// `relation.prepareJobForWrite`.
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
//
// Also, the `prepareJobForWrite` call must happen before initializing output format and output
// committer, since their initialization involve the job configuration, which can be potentially
// decorated in `prepareJobForWrite`.
outputWriterFactory = relation.prepareJobForWrite(job)
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)

outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
Expand Down Expand Up @@ -331,6 +335,8 @@ private[sql] abstract class BaseWriterContainer(
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])

Option(committerClass).map { clazz =>
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
Expand All @@ -350,7 +356,9 @@ private[sql] abstract class BaseWriterContainer(
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
outputFormatClass.newInstance().getOutputCommitter(context)
val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
outputCommitter
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ import scala.reflect.runtime.universe.TypeTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.scalatest.BeforeAndAfterAll
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.util.DateUtils
Expand Down Expand Up @@ -196,7 +198,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {

withParquetDataFrame(allNulls :: Nil) { df =>
val rows = df.collect()
assert(rows.size === 1)
assert(rows.length === 1)
assert(rows.head === Row(Seq.fill(5)(null): _*))
}
}
Expand All @@ -209,7 +211,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {

withParquetDataFrame(allNones :: Nil) { df =>
val rows = df.collect()
assert(rows.size === 1)
assert(rows.length === 1)
assert(rows.head === Row(Seq.fill(3)(null): _*))
}
}
Expand Down Expand Up @@ -379,6 +381,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}

test("SPARK-6352 DirectParquetOutputCommitter") {
val clonedConf = new Configuration(configuration)

// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
Expand All @@ -393,14 +397,46 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val fs = path.getFileSystem(configuration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
finally {
configuration.set("spark.sql.parquet.output.committer.class",
"org.apache.parquet.hadoop.ParquetOutputCommitter")
}

test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") {
withTempPath { dir =>
val clonedConf = new Configuration(configuration)

configuration.set(
SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName)

configuration.set(
"spark.sql.parquet.output.committer.class",
classOf[BogusParquetOutputCommitter].getCanonicalName)

try {
val message = intercept[SparkException] {
sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath)
}.getCause.getMessage
assert(message === "Intentional exception for testing purposes")
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
}
}
}

class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

override def commitJob(jobContext: JobContext): Unit = {
sys.error("Intentional exception for testing purposes")
}
}

class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi

Expand Down

0 comments on commit bbdfc0a

Please sign in to comment.