-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-20236][SQL] dynamic partition overwrite #18714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #79870 has finished for PR 18714 at commit
|
@@ -881,6 +881,16 @@ object SQLConf { | |||
.intConf | |||
.createWithDefault(10000) | |||
|
|||
val HIVE_STYLE_PARTITION_OVERWRITE = | |||
buildConf("spark.sql.hiveStylePartitionOverwrite") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wouldn't call it like this. I'd actually describe what it does, tableOverwrite vs partitionOverwrite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you wanna hide the hive stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about spark.sql.runtimePartitionOverwrite
?
@@ -52,12 +55,22 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |||
*/ | |||
@transient private var addedAbsPathFiles: mutable.Map[String, String] = null | |||
|
|||
@transient private var partitionPaths: mutable.Set[String] = null | |||
|
|||
@transient private var stagingDir: Path = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to add these fields? It seems like they can be computed from addedAbsPathFiles
and the constructor params respectively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe faster? We are not deleting the files one by one. We drop the whole staging directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, we can turn stagingDir into private def stagingDir
or a local variable in a function.
Similarly, partitionPaths
can be computed as filesToMove.map(_.getPath).distinct
during the commit phase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stagingDir
may not needed, but we do need partitionPaths
, which tracks partitions with default path.
@@ -881,6 +881,16 @@ object SQLConf { | |||
.intConf | |||
.createWithDefault(10000) | |||
|
|||
val HIVE_STYLE_PARTITION_OVERWRITE = | |||
buildConf("spark.sql.hiveStylePartitionOverwrite") | |||
.doc("When insert overwrite a partitioned table with dynamic partition columns, Spark " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dynamic
-> dynamic and mixed
class HadoopMapReduceCommitProtocol( | ||
jobId: String, | ||
path: String, | ||
runtimeOverwritePartition: Boolean = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not easy to understand the purpose of this parameter by reading the name. We might need a @param
"to keep the previous behavior, which means Spark will delete all partition directories " + | ||
"that match the static partition values provided in the insert statement.") | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we turn this true
and show how many existing test cases failed? And then, turn it off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a lot of tests will fail because we explicitly assert the old behavior, but I can try
@@ -162,5 +198,8 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |||
val tmp = new Path(src) | |||
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) | |||
} | |||
if (runtimeOverwritePartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we just read this function without the context of this PR, I might ask why we drop the staging directory only when runtimeOverwritePartition
is true?
Any reason we want to keep it unchanged when runtimeOverwritePartition
is false?
val stagingDir: String = committer match { | ||
val stagingDir: Path = committer match { | ||
case _ if runtimeOverwritePartition => | ||
assert(dir.isDefined) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an error message just in case the assert does not match? It helps us read the log.
* e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to | ||
* destination directory at the end, if `runtimeOverwritePartition` is true. | ||
*/ | ||
@transient private var partitionPaths: mutable.Set[String] = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @ericl , addedAbsPathFiles
only tracks partitions with custom path, we still need this partitionPaths
to track partitions with default path
"will delete all partition directories that match the static partition values provided " + | ||
"in the insert statement.") | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CC @gatorsmile I decided not to try it, because this config only take effect when overwriting partitioned table with dynamic partition columns, and this config will change the behavior and fail all the related tests.
Test build #79887 has finished for PR 18714 at commit
|
Got it.
…On Sun, Jul 23, 2017, 10:40 PM Wenchen Fan ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
<#18714 (comment)>:
> @@ -52,12 +55,22 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
*/
@transient private var addedAbsPathFiles: mutable.Map[String, String] = null
+ @transient private var partitionPaths: mutable.Set[String] = null
+
+ @transient private var stagingDir: Path = _
stagingDir may not needed, but we do need partitionPaths, which tracks
partitions with default path.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#18714 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAA6SjVFAGGASJljw9mcxp92eUnErt5sks5sQ01OgaJpZM4OgOKK>
.
|
8abffd0
to
0630372
Compare
Test build #79910 has finished for PR 18714 at commit
|
Test build #79911 has finished for PR 18714 at commit
|
retest this please |
Test build #80415 has finished for PR 18714 at commit
|
outputPath = outputPath.toString, | ||
// If there is no matching partitions, overwrite is same as append, so here we only enable | ||
// runtime partition overwrite when there are matching partitions. | ||
runtimeOverwritePartition = runtimePartitionOverwrite && matchingPartitions.nonEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
matchingPartitions.nonEmpty
needs to be removed too.
@@ -2658,4 +2659,62 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |||
checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1)) | |||
} | |||
} | |||
|
|||
test("SPARK-20236: runtime partition overwrite") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to move the test cases with more test cases
LGTM except the above comments. Thanks! |
Do we still want this? @cloud-fan @gatorsmile |
Yes. This is still needed. The target is 2.3 release |
ping, very interested in this. |
Is this PR still targeted to 2.3? @cloud-fan @gatorsmile |
ah yes, please please :) |
0630372
to
65a9741
Compare
Test build #85590 has finished for PR 18714 at commit
|
class HadoopMapReduceCommitProtocol( | ||
jobId: String, | ||
path: String, | ||
dynamicPartitionOverwrite: Boolean = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indents.
val stagingDir: Path = committer match { | ||
case _ if dynamicPartitionOverwrite => | ||
assert(dir.isDefined, | ||
"The dataset to be written must be partitioned when runtimeOverwritePartition is true.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runtimeOverwritePartition
-> dynamicPartitionOverwrite
LGTM |
Test build #85621 has finished for PR 18714 at commit
|
retest this please |
Test build #85622 has finished for PR 18714 at commit
|
Thanks! Merged to master/2.3 |
## What changes were proposed in this pull request? When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables. data source table: delete all partition directories that match the static partition values provided in the insert statement. hive table: only delete partition directories which have data written into it This PR adds a new config to make users be able to choose hive's behavior. ## How was this patch tested? new tests Author: Wenchen Fan <wenchen@databricks.com> Closes #18714 from cloud-fan/overwrite-partition. (cherry picked from commit a66fe36) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
should this be exposed per write instead of as a global variable? |
@koertkuipers makes sense to me, but I won't add a new API in |
@cloud-fan OK, that works just as well |
@cloud-fan i created SPARK-24860 for this |
What changes were proposed in this pull request?
When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables.
data source table: delete all partition directories that match the static partition values provided in the insert statement.
hive table: only delete partition directories which have data written into it
This PR adds a new config to make users be able to choose hive's behavior.
How was this patch tested?
new tests