-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-21839][SQL] Support SQL config for ORC compression #19055
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #81139 has finished for PR 19055 at commit
|
Hi, @cloud-fan and @gatorsmile . |
Test build #81140 has finished for PR 19055 at commit
|
FYI, this PR can be reviewed before the other ORC PRs. |
Hi, @gatorsmile . |
|
||
/** | ||
* Compression codec to use. By default snappy compression. | ||
* Compression codec to use. By default use the value specified in SQLConf. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confusing. You can remove By default use the value specified in SQLConf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will. Historically, I brought this from ParquetOptions.
* Acceptable values are defined in [[shortOrcCompressionCodecNames]]. | ||
*/ | ||
val compressionCodec: String = { | ||
val compressionCodecClassName: String = { | ||
// `orc.compress` is a ORC configuration. So, here we respect this as an option but | ||
// `compression` has higher precedence than `orc.compress`. It means if both are set, | ||
// we will use `compression`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, update this paragraph to explain the priority.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
"uncompressed, snappy, zlib.") | ||
.stringConf | ||
.transform(_.toLowerCase(Locale.ROOT)) | ||
.checkValues(Set("uncompressed", "snappy", "zlib")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this is inconsistent with shortOrcCompressionCodecNames
?
// The ORC compression short names
private val shortOrcCompressionCodecNames = Map(
"none" -> "NONE",
"uncompressed" -> "NONE",
"snappy" -> "SNAPPY",
"zlib" -> "ZLIB",
"lzo" -> "LZO")
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lzo
will be added later. For none
, I thought the usage in SQLConf is discouraged intentioanlly like in Parquet. I wanted to show the same behavior with Parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added "none" here. Thanks!
I am not familiar with ORC. Above is just a quick look about the changes made in this PR. |
* Acceptable values are defined in [[shortOrcCompressionCodecNames]]. | ||
*/ | ||
val compressionCodec: String = { | ||
val compressionCodecClassName: String = { | ||
// `orc.compress` is a ORC configuration. So, here we respect this as an option but | ||
// `compression` has higher precedence than `orc.compress`. It means if both are set, | ||
// we will use `compression`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we should update here too:
* This will override `orc.compress`.</li> |
and I think we should prioritise
compression
spark.sql.orc.compression.codec
orc.compress
to be consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, there is a technical issue.
spark.sql.orc.compression.codec
has a default value snappy
. So, orc.comress
cannot be used in that order.
"uncompressed, snappy, zlib.") | ||
.stringConf | ||
.transform(_.toLowerCase(Locale.ROOT)) | ||
.checkValues(Set("uncompressed", "snappy", "zlib")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, but lzo
still seeks org.apache.hadoop.hive.ql.io.orc.LzoCodec
though. Wouldn't it be better to leave as what it is intended for now if I understood correctly? I think it should be almost seldom but disallowing lzo
actually looks disallowing the chance of a custom use allowed via .option()
API.
I think none
was introduced as a compression key for .option()
first for Parquet and it was matched for consistency. I think we have kept none
for .option()
API specific somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. "none" is added back. "lzo" is not supported. It failed in the current master branch. We have a ignored test case for that LZO failure.
* Acceptable values are defined in [[shortOrcCompressionCodecNames]]. | ||
*/ | ||
val compressionCodec: String = { | ||
val compressionCodecClassName: String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you matched this to ParquetOptions
but actually I think it's ParquetOptions
to be changed. I found this minor nit but after it got merged - #15580 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, may I change Parquet at this time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, I'll revert this for the other data soruces.
Thank you for review, @gatorsmile and @HyukjinKwon . I'll update the PR. |
Test build #81178 has finished for PR 19055 at commit
|
Thank you again, @gatorsmile and @HyukjinKwon . I updated the PR. |
"uncompressed, snappy, zlib.") | ||
.stringConf | ||
.transform(_.toLowerCase(Locale.ROOT)) | ||
.checkValues(Set("none", "uncompressed", "snappy", "zlib")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun, I think my only main concern is inconsistency between compression
option and this config. If lzo
is an unknown key and it directly throws an exception (or even this was not there in the first place), I would have been okay but it looks attempting to find org.apache.hadoop.hive.ql.io.orc.LzoCodec
:
java.lang.IllegalArgumentException: LZO is not available.
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.createCodec(WriterImpl.java:331)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.<init>(WriterImpl.java:201)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:464)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:74)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:55)
...
...
case LZO:
try {
Class<? extends CompressionCodec> lzo =
(Class<? extends CompressionCodec>)
JavaUtils.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
return lzo.newInstance();
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("LZO is not available.", e);
} catch (InstantiationException e) {
throw new IllegalArgumentException("Problem initializing LZO", e);
} catch (IllegalAccessException e) {
throw new IllegalArgumentException("Insufficient access to LZO", e);
}
...
This appears that if we provide org.apache.hadoop.hive.ql.io.orc.LzoCodec
anyhow in the classpath (as an extream case, the one implemented by an user), it should work, if I read this correctly. This should be almost seldom though.
Does your set of ORC related PRs eventually support lzo
correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. You mean user-provided hive or libraries. It makes sense. I will add lzo too here. Thank you for pointing out that.
Test build #81199 has finished for PR 19055 at commit
|
Retest this please. |
|
||
test("SPARK-21839: Add SQL config for ORC compression") { | ||
val conf = sqlContext.sessionState.conf | ||
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment here to explain the test scenario.
// to test the default of spark.sql.orc.compression.codec
is snappy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodec == "ZLIB") | ||
} | ||
|
||
Seq("NONE", "SNAPPY", "ZLIB", "LZO").foreach { c => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// to test all the valid options of spark.sql.orc.compression.codec
"none", "uncompressed", "snappy", "zlib", "lzo".
You missed one of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, It's intentional. It's tested in the above. "UNCOMPRESSED" is replaced into "NONE"
So, I omitted here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To add that, we need if
condition. I'll add that.
withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { | ||
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") | ||
assert( | ||
new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodec == "ZLIB") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please add another scenario when users specify compression
Thank you for review again, @gatorsmile . I fixed them. |
// Test all the valid options of spark.sql.orc.compression.codec | ||
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c => | ||
withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { | ||
if (c == "UNCOMPRESSED") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val expected = if ... else c
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep! It's much better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Test build #81218 has finished for PR 19055 at commit
|
Test build #81219 has finished for PR 19055 at commit
|
Test build #81220 has finished for PR 19055 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for minor comments. @gatorsmile, do you maybe have more comments?
@@ -18,12 +18,13 @@ | |||
package org.apache.spark.sql.hive.orc | |||
|
|||
import java.io.File | |||
import java.util.Locale |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks unused in this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Thanks!
val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION) | ||
val codecName = parameters | ||
.get("compression") | ||
.orElse(orcCompressionConf) | ||
.getOrElse("snappy").toLowerCase(Locale.ROOT) | ||
.getOrElse(sqlConf.orcCompressionCodec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we update the default values for consistency with Parquet one? :
* <li>`compression` (default `snappy`): compression codec to use when saving to file. This can be |
spark/python/pyspark/sql/readwriter.py
Line 855 in 51620e2
default value, ``snappy``. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value is snappy, isn't it?
.createWithDefault("snappy")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking like:
spark/python/pyspark/sql/readwriter.py
Lines 751 to 753 in 51620e2
This will override ``spark.sql.parquet.compression.codec``. If None | |
is set, it uses the value specified in | |
``spark.sql.parquet.compression.codec``. |
Wouldn't we use the value set in spark.sql.parquet.compression.codec
by default if compression
is unset via option API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I thought the purpose of this configuration is rather for setting the default compression codec for ORC datasource ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is the priority. If compression
and orc.compression
is unset via option, we use SQLConf.
compression
-> orc.compression
-> spark.sql.orc.compression.codec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main purpose of this PR is to support users to control ORC compression by using SQLConf, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default codec is unchanged and the priority is the same. Also, all previous user-given options are respected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it looks I had to be clear. I meant fixing the comment for default value from snappy
to spark.sql.orc.compression.codec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I'll fix that.
Thank you very much for review, @HyukjinKwon ! 👍 |
According to your review comments, I updated the comment , too. |
Test build #81244 has finished for PR 19055 at commit
|
Test build #81246 has finished for PR 19055 at commit
|
Test build #81247 has finished for PR 19055 at commit
|
// `orc.compress` is a ORC configuration. So, here we respect this as an option but | ||
// `compression` has higher precedence than `orc.compress`. It means if both are set, | ||
// we will use `compression`. | ||
// `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` is used in order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is used in order
-> are in order of precedence from highest to lowest
LGTM except a minor comment. |
Thank you, @gatorsmile . I fixed it, too. |
Test build #81265 has finished for PR 19055 at commit
|
Merged to master. |
Thank you for merging, @HyukjinKwon ! |
Also, Thank you again, @gatorsmile ! |
What changes were proposed in this pull request?
This PR aims to support
spark.sql.orc.compression.codec
like Parquet'sspark.sql.parquet.compression.codec
. Users can use SQLConf to control ORC compression, too.How was this patch tested?
Pass the Jenkins with new and updated test cases.