Skip to content

[SPARK-20236][SQL] dynamic partition overwrite #18714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import org.apache.spark.util.Utils
*
* 1. Implementations must be serializable, as the committer instance instantiated on the driver
* will be used for tasks on executors.
* 2. Implementations should have a constructor with 2 arguments:
* (jobId: String, path: String)
* 2. Implementations should have a constructor with 2 or 3 arguments:
* (jobId: String, path: String) or
* (jobId: String, path: String, dynamicPartitionOverwrite: Boolean)
* 3. A committer should not be reused across multiple Spark jobs.
*
* The proper call sequence is:
Expand Down Expand Up @@ -139,10 +140,22 @@ object FileCommitProtocol {
/**
* Instantiates a FileCommitProtocol using the given className.
*/
def instantiate(className: String, jobId: String, outputPath: String)
: FileCommitProtocol = {
def instantiate(
className: String,
jobId: String,
outputPath: String,
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
// First try the constructor with arguments (jobId: String, outputPath: String,
// dynamicPartitionOverwrite: Boolean).
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
try {
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
} catch {
case _: NoSuchMethodException =>
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,19 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
*
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
* dynamically, i.e., we first write files under a staging
* directory with partition path, e.g.
* /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
* we first clean up the corresponding partition directories at
* destination path, e.g. /path/to/destination/a=1/b=1, and move
* files from staging directory to the corresponding partition
* directories under destination path.
*/
class HadoopMapReduceCommitProtocol(jobId: String, path: String)
class HadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
extends FileCommitProtocol with Serializable with Logging {

import FileCommitProtocol._
Expand All @@ -67,9 +78,17 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
@transient private var addedAbsPathFiles: mutable.Map[String, String] = null

/**
* The staging directory for all files committed with absolute output paths.
* Tracks partitions with default path that have new files written into them by this task,
* e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to
* destination directory at the end, if `dynamicPartitionOverwrite` is true.
*/
private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
@transient private var partitionPaths: mutable.Set[String] = null

/**
* The staging directory of this write job. Spark uses it to deal with files with absolute output
* path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.newInstance()
Expand All @@ -85,11 +104,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFilename(taskContext, ext)

val stagingDir: String = committer match {
val stagingDir: Path = committer match {
case _ if dynamicPartitionOverwrite =>
assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
this.stagingDir
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
Option(f.getWorkPath).map(_.toString).getOrElse(path)
case _ => path
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}

dir.map { d =>
Expand All @@ -106,8 +130,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)

// Include a UUID here to prevent file collisions for one task writing to different dirs.
// In principle we could include hash(absoluteDir) instead but this is simpler.
val tmpOutputPath = new Path(
absPathStagingDir, UUID.randomUUID().toString() + "-" + filename).toString
val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString

addedAbsPathFiles(tmpOutputPath) = absOutputPath
tmpOutputPath
Expand Down Expand Up @@ -141,37 +164,57 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)

override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
committer.commitJob(jobContext)
val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
.foldLeft(Map[String, String]())(_ ++ _)
logDebug(s"Committing files staged for absolute locations $filesToMove")

if (hasValidPath) {
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
val (allAbsPathFiles, allPartitionPaths) =
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)

val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
logDebug(s"Committing files staged for absolute locations $filesToMove")
if (dynamicPartitionOverwrite) {
val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet
logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths")
absPartitionPaths.foreach(fs.delete(_, true))
}
for ((src, dst) <- filesToMove) {
fs.rename(new Path(src), new Path(dst))
}
fs.delete(absPathStagingDir, true)

if (dynamicPartitionOverwrite) {
val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
for (part <- partitionPaths) {
val finalPartPath = new Path(path, part)
fs.delete(finalPartPath, true)
fs.rename(new Path(stagingDir, part), finalPartPath)
}
}

fs.delete(stagingDir, true)
}
}

override def abortJob(jobContext: JobContext): Unit = {
committer.abortJob(jobContext, JobStatus.State.FAILED)
if (hasValidPath) {
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(absPathStagingDir, true)
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(stagingDir, true)
}
}

override def setupTask(taskContext: TaskAttemptContext): Unit = {
committer = setupCommitter(taskContext)
committer.setupTask(taskContext)
addedAbsPathFiles = mutable.Map[String, String]()
partitionPaths = mutable.Set[String]()
}

override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
val attemptId = taskContext.getTaskAttemptID
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
new TaskCommitMessage(addedAbsPathFiles.toMap)
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
}

override def abortTask(taskContext: TaskAttemptContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,24 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(100)

object PartitionOverwriteMode extends Enumeration {
val STATIC, DYNAMIC = Value
}

val PARTITION_OVERWRITE_MODE =
buildConf("spark.sql.sources.partitionOverwriteMode")
.doc("When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: " +
"static and dynamic. In static mode, Spark deletes all the partitions that match the " +
"partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before " +
"overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite " +
"those partitions that have data written into it at runtime. By default we use static " +
"mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't " +
"affect Hive serde tables, as they are always overwritten with dynamic mode.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(PartitionOverwriteMode.values.map(_.toString))
.createWithDefault(PartitionOverwriteMode.STATIC.toString)

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -1386,6 +1404,9 @@ class SQLConf extends Serializable with Logging {

def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)

def partitionOverwriteMode: PartitionOverwriteMode.Value =
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.util.SchemaUtils

/**
Expand Down Expand Up @@ -89,20 +90,29 @@ case class InsertIntoHadoopFsRelationCommand(
}

val pathExists = fs.exists(qualifiedOutputPath)
// If we are appending data to an existing dir.
val isAppend = pathExists && (mode == SaveMode.Append)

val enableDynamicOverwrite =
sparkSession.sessionState.conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
// partition columns.
val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
staticPartitions.size < partitionColumns.length

val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = outputPath.toString)
outputPath = outputPath.toString,
dynamicPartitionOverwrite = dynamicPartitionOverwrite)

val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
false
} else if (dynamicPartitionOverwrite) {
// For dynamic partition overwrite, do not delete partition directories ahead.
true
} else {
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
true
Expand All @@ -126,7 +136,9 @@ case class InsertIntoHadoopFsRelationCommand(
catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
ifNotExists = true).run(sparkSession)
}
if (mode == SaveMode.Overwrite) {
// For dynamic partition overwrite, we never remove partitions but only update existing
// ones.
if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
AlterTableDropPartitionCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ import org.apache.spark.sql.internal.SQLConf
* A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual
* Hadoop output committer using an option specified in SQLConf.
*/
class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String)
extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {
class SQLHadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
with Serializable with Logging {

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
var committer = super.setupCommitter(context)

val configuration = context.getConfiguration
val clazz =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.File

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -442,4 +444,80 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
assert(e.contains("Only Data Sources providing FileFormat are supported"))
}
}

test("SPARK-20236: dynamic partition overwrite without catalog table") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
withTempPath { path =>
Seq((1, 1, 1)).toDF("i", "part1", "part2")
.write.partitionBy("part1", "part2").parquet(path.getAbsolutePath)
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 1, 1))

Seq((2, 1, 1)).toDF("i", "part1", "part2")
.write.partitionBy("part1", "part2").mode("overwrite").parquet(path.getAbsolutePath)
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(2, 1, 1))

Seq((2, 2, 2)).toDF("i", "part1", "part2")
.write.partitionBy("part1", "part2").mode("overwrite").parquet(path.getAbsolutePath)
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
}
}
}

test("SPARK-20236: dynamic partition overwrite") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
withTable("t") {
sql(
"""
|create table t(i int, part1 int, part2 int) using parquet
|partitioned by (part1, part2)
""".stripMargin)

sql("insert into t partition(part1=1, part2=1) select 1")
checkAnswer(spark.table("t"), Row(1, 1, 1))

sql("insert overwrite table t partition(part1=1, part2=1) select 2")
checkAnswer(spark.table("t"), Row(2, 1, 1))

sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)

sql("insert overwrite table t partition(part1=1, part2=2) select 3")
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)

sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
}
}
}

test("SPARK-20236: dynamic partition overwrite with customer partition path") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
withTable("t") {
sql(
"""
|create table t(i int, part1 int, part2 int) using parquet
|partitioned by (part1, part2)
""".stripMargin)

val path1 = Utils.createTempDir()
sql(s"alter table t add partition(part1=1, part2=1) location '$path1'")
sql(s"insert into t partition(part1=1, part2=1) select 1")
checkAnswer(spark.table("t"), Row(1, 1, 1))

sql("insert overwrite table t partition(part1=1, part2=1) select 2")
checkAnswer(spark.table("t"), Row(2, 1, 1))

sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)

val path2 = Utils.createTempDir()
sql(s"alter table t add partition(part1=1, part2=2) location '$path2'")
sql("insert overwrite table t partition(part1=1, part2=2) select 3")
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)

sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
}
}
}
}