Skip to content

[SPARK-20236][SQL] dynamic partition overwrite #18714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables.

data source table: delete all partition directories that match the static partition values provided in the insert statement.

hive table: only delete partition directories which have data written into it

This PR adds a new config to make users be able to choose hive's behavior.

How was this patch tested?

new tests

@cloud-fan
Copy link
Contributor Author

cc @gatorsmile @ericl

@SparkQA
Copy link

SparkQA commented Jul 22, 2017

Test build #79870 has finished for PR 18714 at commit 8e7f5dd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class HadoopMapReduceCommitProtocol(
  • class SQLHadoopMapReduceCommitProtocol(

@@ -881,6 +881,16 @@ object SQLConf {
.intConf
.createWithDefault(10000)

val HIVE_STYLE_PARTITION_OVERWRITE =
buildConf("spark.sql.hiveStylePartitionOverwrite")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wouldn't call it like this. I'd actually describe what it does, tableOverwrite vs partitionOverwrite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you wanna hide the hive stuff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about spark.sql.runtimePartitionOverwrite ?

@@ -52,12 +55,22 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
*/
@transient private var addedAbsPathFiles: mutable.Map[String, String] = null

@transient private var partitionPaths: mutable.Set[String] = null

@transient private var stagingDir: Path = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to add these fields? It seems like they can be computed from addedAbsPathFiles and the constructor params respectively.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe faster? We are not deleting the files one by one. We drop the whole staging directory.

Copy link
Contributor

@ericl ericl Jul 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we can turn stagingDir into private def stagingDir or a local variable in a function.

Similarly, partitionPaths can be computed as filesToMove.map(_.getPath).distinct during the commit phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stagingDir may not needed, but we do need partitionPaths, which tracks partitions with default path.

@@ -881,6 +881,16 @@ object SQLConf {
.intConf
.createWithDefault(10000)

val HIVE_STYLE_PARTITION_OVERWRITE =
buildConf("spark.sql.hiveStylePartitionOverwrite")
.doc("When insert overwrite a partitioned table with dynamic partition columns, Spark " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dynamic -> dynamic and mixed

class HadoopMapReduceCommitProtocol(
jobId: String,
path: String,
runtimeOverwritePartition: Boolean = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not easy to understand the purpose of this parameter by reading the name. We might need a @param

"to keep the previous behavior, which means Spark will delete all partition directories " +
"that match the static partition values provided in the insert statement.")
.booleanConf
.createWithDefault(false)
Copy link
Member

@gatorsmile gatorsmile Jul 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we turn this true and show how many existing test cases failed? And then, turn it off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of tests will fail because we explicitly assert the old behavior, but I can try

@@ -162,5 +198,8 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
val tmp = new Path(src)
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
}
if (runtimeOverwritePartition) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just read this function without the context of this PR, I might ask why we drop the staging directory only when runtimeOverwritePartition is true?

Any reason we want to keep it unchanged when runtimeOverwritePartition is false?

val stagingDir: String = committer match {
val stagingDir: Path = committer match {
case _ if runtimeOverwritePartition =>
assert(dir.isDefined)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an error message just in case the assert does not match? It helps us read the log.

* e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to
* destination directory at the end, if `runtimeOverwritePartition` is true.
*/
@transient private var partitionPaths: mutable.Set[String] = null
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ericl , addedAbsPathFiles only tracks partitions with custom path, we still need this partitionPaths to track partitions with default path

"will delete all partition directories that match the static partition values provided " +
"in the insert statement.")
.booleanConf
.createWithDefault(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC @gatorsmile I decided not to try it, because this config only take effect when overwriting partitioned table with dynamic partition columns, and this config will change the behavior and fail all the related tests.

@SparkQA
Copy link

SparkQA commented Jul 23, 2017

Test build #79887 has finished for PR 18714 at commit 9d6eeaf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl
Copy link
Contributor

ericl commented Jul 23, 2017 via email

@cloud-fan cloud-fan changed the title [SPARK-20236][SQL] hive style partition overwrite [SPARK-20236][SQL] runtime partition overwrite Jul 24, 2017
@cloud-fan cloud-fan force-pushed the overwrite-partition branch from 8abffd0 to 0630372 Compare July 24, 2017 16:13
@SparkQA
Copy link

SparkQA commented Jul 24, 2017

Test build #79910 has finished for PR 18714 at commit 8abffd0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 24, 2017

Test build #79911 has finished for PR 18714 at commit 0630372.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 8, 2017

Test build #80415 has finished for PR 18714 at commit 0630372.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

outputPath = outputPath.toString,
// If there is no matching partitions, overwrite is same as append, so here we only enable
// runtime partition overwrite when there are matching partitions.
runtimeOverwritePartition = runtimePartitionOverwrite && matchingPartitions.nonEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matchingPartitions.nonEmpty needs to be removed too.

@@ -2658,4 +2659,62 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1))
}
}

test("SPARK-20236: runtime partition overwrite") {
Copy link
Member

@gatorsmile gatorsmile Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to move the test cases with more test cases

@gatorsmile
Copy link
Member

LGTM except the above comments. Thanks!

@jiangxb1987
Copy link
Contributor

Do we still want this? @cloud-fan @gatorsmile

@gatorsmile
Copy link
Member

gatorsmile commented Oct 3, 2017

Yes. This is still needed. The target is 2.3 release

@felixcheung
Copy link
Member

ping, very interested in this.

@jiangxb1987
Copy link
Contributor

Is this PR still targeted to 2.3? @cloud-fan @gatorsmile

@felixcheung
Copy link
Member

ah yes, please please :)

@cloud-fan cloud-fan force-pushed the overwrite-partition branch from 0630372 to 65a9741 Compare January 2, 2018 08:14
@cloud-fan cloud-fan changed the title [SPARK-20236][SQL] runtime partition overwrite [SPARK-20236][SQL] dynamic partition overwrite Jan 2, 2018
@SparkQA
Copy link

SparkQA commented Jan 2, 2018

Test build #85590 has finished for PR 18714 at commit 65a9741.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class HadoopMapReduceCommitProtocol(
  • class SQLHadoopMapReduceCommitProtocol(

class HadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indents.

val stagingDir: Path = committer match {
case _ if dynamicPartitionOverwrite =>
assert(dir.isDefined,
"The dataset to be written must be partitioned when runtimeOverwritePartition is true.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runtimeOverwritePartition -> dynamicPartitionOverwrite

@gatorsmile
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Jan 3, 2018

Test build #85621 has finished for PR 18714 at commit f7745a0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 3, 2018

Test build #85622 has finished for PR 18714 at commit f7745a0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master/2.3

asfgit pushed a commit that referenced this pull request Jan 3, 2018
## What changes were proposed in this pull request?

When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables.

data source table: delete all partition directories that match the static partition values provided in the insert statement.

hive table: only delete partition directories which have data written into it

This PR adds a new config to make users be able to choose hive's behavior.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18714 from cloud-fan/overwrite-partition.

(cherry picked from commit a66fe36)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@asfgit asfgit closed this in a66fe36 Jan 3, 2018
@koertkuipers
Copy link
Contributor

should this be exposed per write instead of as a global variable?
e.g.
dataframe.write.csv.partitionMode(Dynamic).partitionBy(...).save(...)

@cloud-fan
Copy link
Contributor Author

@koertkuipers makes sense to me, but I won't add a new API in DataFrameWriter for it, I think we can just add a write option for file source, e.g. df.write.option("partitionOverwriteMode", "dynamic").parquet...

@koertkuipers
Copy link
Contributor

@cloud-fan OK, that works just as well

@koertkuipers
Copy link
Contributor

@cloud-fan i created SPARK-24860 for this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants