-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-3007][SQL]Add Dynamic Partition support to Spark Sql hive #2226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Can one of the admins verify this patch? |
ok to test |
Hi @marmbrus and @liancheng, the latest code had pass "dev/lint-scala" and "sbt/sbt catalyst/test sql/test hive/test" locally. |
fileSinkConf, | ||
jobConfSer, | ||
sc.hiveconf.getBoolean("hive.exec.compress.output", false), | ||
dynamicPartNum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bad indentation :)
Please also submit golden answer files for newly whitelisted test cases in |
ok to test |
@baishuo Just added a note about Hive golden answer files in Spark Wiki https://cwiki.apache.org/confluence/display/SPARK/Spark+SQL+Internals, please refer to this page to generate and submit those files. Thanks! |
ok to test |
The golden file related HiveCompatibilitySuite with had already exists in master branch of spark. So do not need to add them. |
can this PR be tested? :) |
test this please |
Can one of the admins verify this patch? |
test this please |
if (outputPath == null) { | ||
throw new IOException("Undefined job output-path") | ||
} | ||
val workPath = new Path(outputPath, dynamicPartPath.substring(1)) // remove "/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about .stripPrefix("/")
?
ok to test |
writerMap += (record._2 -> tempWriter) | ||
tempWriter | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the indentation here is off.
QA tests have started for PR 2226 at commit
|
writer.commitJob() | ||
/* | ||
* if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default | ||
* */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be:
/**
* Returns `rowVal` as a String. If `rowVal` is null or equal to "", returns the default partition name.
*/
Thanks again for working on this! This will be an awesome feature to have. :) I did a pretty detailed pass and made a few comments. A few high-level notes:
|
* Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer | ||
* for writing data to a dynamic partition. | ||
*/ | ||
def open() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems open
is not a good name at here. Maybe rename it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe init()
? Also, I forgot to update the comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it always called after executorSideSetup
? If so, can we rename it to something like setupWriter
(or initWriter
) and call it at the end of executorSideSetup
instead of call it in writeToFile
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I also realized this. Renamed this to initWriters
and merged it into executorSideSetup
. Also merged the commit()
call into close()
.
Addressed @yhuai's comments except for adding more tests, will add them soon. |
QA tests have started for PR 2226 at commit
|
QA tests have finished for PR 2226 at commit
|
QA tests have started for PR 2226 at commit
|
QA tests have finished for PR 2226 at commit
|
Test PASSed. |
LGTM @marmbrus This is finally good to go :) |
thanks a lot to @liancheng :) |
Awesome, thanks guys! Can you remove the |
had remove "s from title @marmbrus |
I think I should say thank you to @liancheng and @yhuai. During the communication with you, I had learned a lot :) |
hi @marmbrus ,would you please run the merge script again? :) |
I've merged this into master. Sorry for the delay -- unicode characters in the commit author names were causing our merge script to crash! |
Haha, have we updated our merge script to handle unicode? I should note that half of Spark SQL contributors are Chinese :) |
PR #2226 was reverted because it broke Jenkins builds for unknown reason. This debugging PR aims to fix the Jenkins build. This PR also fixes two bugs: 1. Compression configurations in `InsertIntoHiveTable` are disabled by mistake The `FileSinkDesc` object passed to the writer container doesn't have compression related configurations. These configurations are not taken care of until `saveAsHiveFile` is called. This PR moves compression code forward, right after instantiation of the `FileSinkDesc` object. 1. `PreInsertionCasts` doesn't take table partitions into account In `castChildOutput`, `table.attributes` only contains non-partition columns, thus for partitioned table `childOutputDataTypes` never equals to `tableOutputDataTypes`. This results funny analyzed plan like this: ``` == Analyzed Logical Plan == InsertIntoTable Map(partcol1 -> None, partcol2 -> None), false MetastoreRelation default, dynamic_part_table, None Project [c_0#1164,c_1#1165,c_2#1166] Project [c_0#1164,c_1#1165,c_2#1166] Project [c_0#1164,c_1#1165,c_2#1166] ... (repeats 99 times) ... Project [c_0#1164,c_1#1165,c_2#1166] Project [c_0#1164,c_1#1165,c_2#1166] Project [1 AS c_0#1164,1 AS c_1#1165,1 AS c_2#1166] Filter (key#1170 = 150) MetastoreRelation default, src, None ``` Awful though this logical plan looks, it's harmless because all projects will be eliminated by optimizer. Guess that's why this issue hasn't been caught before. Author: Cheng Lian <lian.cs.zju@gmail.com> Author: baishuo(白硕) <vc_java@hotmail.com> Author: baishuo <vc_java@hotmail.com> Closes #2616 from liancheng/dp-fix and squashes the following commits: 21935b6 [Cheng Lian] Adds back deleted trailing space f471c4b [Cheng Lian] PreInsertionCasts should take table partitions into account a132c80 [Cheng Lian] Fixes output compression 9c6eb2d [Cheng Lian] Adds tests to verify dynamic partitioning folder layout 0eed349 [Cheng Lian] Addresses @yhuai's comments 26632c3 [Cheng Lian] Adds more tests 9227181 [Cheng Lian] Minor refactoring c47470e [Cheng Lian] Refactors InsertIntoHiveTable to a Command 6fb16d7 [Cheng Lian] Fixes typo in test name, regenerated golden answer files d53daa5 [Cheng Lian] Refactors dynamic partitioning support b821611 [baishuo] pass check style 997c990 [baishuo] use HiveConf.DEFAULTPARTITIONNAME to replace hive.exec.default.partition.name 761ecf2 [baishuo] modify according micheal's advice 207c6ac [baishuo] modify for some bad indentation caea6fb [baishuo] modify code to pass scala style checks b660e74 [baishuo] delete a empty else branch cd822f0 [baishuo] do a little modify 8e7268c [baishuo] update file after test 3f91665 [baishuo(白硕)] Update Cast.scala 8ad173c [baishuo(白硕)] Update InsertIntoHiveTable.scala 051ba91 [baishuo(白硕)] Update Cast.scala d452eb3 [baishuo(白硕)] Update HiveQuerySuite.scala 37c603b [baishuo(白硕)] Update InsertIntoHiveTable.scala 98cfb1f [baishuo(白硕)] Update HiveCompatibilitySuite.scala 6af73f4 [baishuo(白硕)] Update InsertIntoHiveTable.scala adf02f1 [baishuo(白硕)] Update InsertIntoHiveTable.scala 1867e23 [baishuo(白硕)] Update SparkHadoopWriter.scala 6bb5880 [baishuo(白硕)] Update HiveQl.scala
… versions This is a follow up of #2226 and #2616 to fix Jenkins master SBT build failures for lower Hadoop versions (1.0.x and 2.0.x). The root cause is the semantics difference of `FileSystem.globStatus()` between different versions of Hadoop, as illustrated by the following test code: ```scala object GlobExperiments extends App { val conf = new Configuration() val fs = FileSystem.getLocal(conf) fs.globStatus(new Path("/tmp/wh/*/*/*")).foreach { status => println(status.getPath) } } ``` Target directory structure: ``` /tmp/wh ├── dir0 │ ├── dir1 │ │ └── level2 │ └── level1 └── level0 ``` Hadoop 2.4.1 result: ``` file:/tmp/wh/dir0/dir1/level2 ``` Hadoop 1.0.4 resuet: ``` file:/tmp/wh/dir0/dir1/level2 file:/tmp/wh/dir0/level1 file:/tmp/wh/level0 ``` In #2226 and #2616, we call `FileOutputCommitter.commitJob()` at the end of the job, and the `_SUCCESS` mark file is written. When working with lower Hadoop versions, due to the `globStatus()` semantics issue, `_SUCCESS` is included as a separate partition data file by `Hive.loadDynamicPartitions()`, and fails partition spec checking. The fix introduced in this PR is kind of a hack: when inserting data with dynamic partitioning, we intentionally avoid writing the `_SUCCESS` marker to workaround this issue. Hive doesn't suffer this issue because `FileSinkOperator` doesn't call `FileOutputCommitter.commitJob()`, instead, it calls `Utilities.mvFileToFinalPath()` to cleanup the output directory and then loads it into Hive warehouse by with `loadDynamicPartitions()`/`loadPartition()`/`loadTable()`. This approach is better because it handles failed job and speculative tasks properly. We should add this step to `InsertIntoHiveTable` in another PR. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2663 from liancheng/dp-hadoop-1-fix and squashes the following commits: 0177dae [Cheng Lian] Fixes dynamic partitioning support for lower Hadoop versions
a new PR base on new master. changes are the same as #1919