Skip to content

[SPARK-21839][SQL] Support SQL config for ORC compression #19055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed

[SPARK-21839][SQL] Support SQL config for ORC compression #19055

wants to merge 10 commits into from

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Aug 25, 2017

What changes were proposed in this pull request?

This PR aims to support spark.sql.orc.compression.codec like Parquet's spark.sql.parquet.compression.codec. Users can use SQLConf to control ORC compression, too.

How was this patch tested?

Pass the Jenkins with new and updated test cases.

@SparkQA
Copy link

SparkQA commented Aug 25, 2017

Test build #81139 has finished for PR 19055 at commit f3ccfec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan and @gatorsmile .
Could you review this ORC option PR? This is spun off from #18953 in order to reduce the review burden.

@SparkQA
Copy link

SparkQA commented Aug 25, 2017

Test build #81140 has finished for PR 19055 at commit 5998c29.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

FYI, this PR can be reviewed before the other ORC PRs.

@dongjoon-hyun
Copy link
Member Author

Hi, @gatorsmile .
Could you review this ORC configuration PR?


/**
* Compression codec to use. By default snappy compression.
* Compression codec to use. By default use the value specified in SQLConf.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. You can remove By default use the value specified in SQLConf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will. Historically, I brought this from ParquetOptions.

* Acceptable values are defined in [[shortOrcCompressionCodecNames]].
*/
val compressionCodec: String = {
val compressionCodecClassName: String = {
// `orc.compress` is a ORC configuration. So, here we respect this as an option but
// `compression` has higher precedence than `orc.compress`. It means if both are set,
// we will use `compression`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, update this paragraph to explain the priority.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

"uncompressed, snappy, zlib.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("uncompressed", "snappy", "zlib"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is inconsistent with shortOrcCompressionCodecNames?

  // The ORC compression short names
  private val shortOrcCompressionCodecNames = Map(
    "none" -> "NONE",
    "uncompressed" -> "NONE",
    "snappy" -> "SNAPPY",
    "zlib" -> "ZLIB",
    "lzo" -> "LZO")
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lzo will be added later. For none, I thought the usage in SQLConf is discouraged intentioanlly like in Parquet. I wanted to show the same behavior with Parquet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added "none" here. Thanks!

@gatorsmile
Copy link
Member

I am not familiar with ORC. Above is just a quick look about the changes made in this PR.

* Acceptable values are defined in [[shortOrcCompressionCodecNames]].
*/
val compressionCodec: String = {
val compressionCodecClassName: String = {
// `orc.compress` is a ORC configuration. So, here we respect this as an option but
// `compression` has higher precedence than `orc.compress`. It means if both are set,
// we will use `compression`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should update here too:

* This will override `orc.compress`.</li>

and I think we should prioritise

  1. compression
  2. spark.sql.orc.compression.codec
  3. orc.compress

to be consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, there is a technical issue.
spark.sql.orc.compression.codec has a default value snappy. So, orc.comress cannot be used in that order.

"uncompressed, snappy, zlib.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("uncompressed", "snappy", "zlib"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, but lzo still seeks org.apache.hadoop.hive.ql.io.orc.LzoCodec though. Wouldn't it be better to leave as what it is intended for now if I understood correctly? I think it should be almost seldom but disallowing lzo actually looks disallowing the chance of a custom use allowed via .option() API.

I think none was introduced as a compression key for .option() first for Parquet and it was matched for consistency. I think we have kept none for .option() API specific somehow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. "none" is added back. "lzo" is not supported. It failed in the current master branch. We have a ignored test case for that LZO failure.

* Acceptable values are defined in [[shortOrcCompressionCodecNames]].
*/
val compressionCodec: String = {
val compressionCodecClassName: String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you matched this to ParquetOptions but actually I think it's ParquetOptions to be changed. I found this minor nit but after it got merged - #15580 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, may I change Parquet at this time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, I'll revert this for the other data soruces.

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @gatorsmile and @HyukjinKwon . I'll update the PR.

@SparkQA
Copy link

SparkQA commented Aug 28, 2017

Test build #81178 has finished for PR 19055 at commit afbb6f2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Aug 28, 2017

Thank you again, @gatorsmile and @HyukjinKwon . I updated the PR.
I didn't change about LZO because it is not supported in Apache Spark yet. Please refer the test case in OrcQuerySuite.scala. Could you review this again?

"uncompressed, snappy, zlib.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "zlib"))
Copy link
Member

@HyukjinKwon HyukjinKwon Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, I think my only main concern is inconsistency between compression option and this config. If lzo is an unknown key and it directly throws an exception (or even this was not there in the first place), I would have been okay but it looks attempting to find org.apache.hadoop.hive.ql.io.orc.LzoCodec:

java.lang.IllegalArgumentException: LZO is not available.
	at org.apache.hadoop.hive.ql.io.orc.WriterImpl.createCodec(WriterImpl.java:331)
	at org.apache.hadoop.hive.ql.io.orc.WriterImpl.<init>(WriterImpl.java:201)
	at org.apache.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:464)
	at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:74)
	at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:55)
       ...
...
      case LZO:
        try {
          Class<? extends CompressionCodec> lzo =
              (Class<? extends CompressionCodec>)
                  JavaUtils.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
          return lzo.newInstance();
        } catch (ClassNotFoundException e) {
          throw new IllegalArgumentException("LZO is not available.", e);
        } catch (InstantiationException e) {
          throw new IllegalArgumentException("Problem initializing LZO", e);
        } catch (IllegalAccessException e) {
          throw new IllegalArgumentException("Insufficient access to LZO", e);
        }
...

This appears that if we provide org.apache.hadoop.hive.ql.io.orc.LzoCodec anyhow in the classpath (as an extream case, the one implemented by an user), it should work, if I read this correctly. This should be almost seldom though.

Does your set of ORC related PRs eventually support lzo correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You mean user-provided hive or libraries. It makes sense. I will add lzo too here. Thank you for pointing out that.

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #81199 has finished for PR 19055 at commit 437d181.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Retest this please.


test("SPARK-21839: Add SQL config for ORC compression") {
val conf = sqlContext.sessionState.conf
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here to explain the test scenario.
// to test the default of spark.sql.orc.compression.codec is snappy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodec == "ZLIB")
}

Seq("NONE", "SNAPPY", "ZLIB", "LZO").foreach { c =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// to test all the valid options of spark.sql.orc.compression.codec

"none", "uncompressed", "snappy", "zlib", "lzo".

You missed one of it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, It's intentional. It's tested in the above. "UNCOMPRESSED" is replaced into "NONE"
So, I omitted here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add that, we need if condition. I'll add that.

withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
assert(
new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodec == "ZLIB")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also please add another scenario when users specify compression

@dongjoon-hyun
Copy link
Member Author

Thank you for review again, @gatorsmile . I fixed them.

// Test all the valid options of spark.sql.orc.compression.codec
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c =>
withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
if (c == "UNCOMPRESSED") {
Copy link
Member

@gatorsmile gatorsmile Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val expected = if ... else c
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! It's much better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #81218 has finished for PR 19055 at commit 437d181.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #81219 has finished for PR 19055 at commit aafbfbb.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #81220 has finished for PR 19055 at commit 8aebcd3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for minor comments. @gatorsmile, do you maybe have more comments?

@@ -18,12 +18,13 @@
package org.apache.spark.sql.hive.orc

import java.io.File
import java.util.Locale
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks unused in this test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Thanks!

val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION)
val codecName = parameters
.get("compression")
.orElse(orcCompressionConf)
.getOrElse("snappy").toLowerCase(Locale.ROOT)
.getOrElse(sqlConf.orcCompressionCodec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we update the default values for consistency with Parquet one? :

* <li>`compression` (default `snappy`): compression codec to use when saving to file. This can be

default value, ``snappy``.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value is snappy, isn't it?

.createWithDefault("snappy")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking like:

This will override ``spark.sql.parquet.compression.codec``. If None
is set, it uses the value specified in
``spark.sql.parquet.compression.codec``.

Wouldn't we use the value set in spark.sql.parquet.compression.codec by default if compression is unset via option API?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I thought the purpose of this configuration is rather for setting the default compression codec for ORC datasource ..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is the priority. If compression and orc.compression is unset via option, we use SQLConf.
compression -> orc.compression -> spark.sql.orc.compression.codec

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main purpose of this PR is to support users to control ORC compression by using SQLConf, too.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default codec is unchanged and the priority is the same. Also, all previous user-given options are respected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it looks I had to be clear. I meant fixing the comment for default value from snappy to spark.sql.orc.compression.codec.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I'll fix that.

@dongjoon-hyun
Copy link
Member Author

Thank you very much for review, @HyukjinKwon ! 👍

@dongjoon-hyun
Copy link
Member Author

According to your review comments, I updated the comment , too.
Thank you again for spending your time to review my PRs!

@SparkQA
Copy link

SparkQA commented Aug 30, 2017

Test build #81244 has finished for PR 19055 at commit 94e624e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2017

Test build #81246 has finished for PR 19055 at commit 34e2845.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2017

Test build #81247 has finished for PR 19055 at commit 4c0300c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// `orc.compress` is a ORC configuration. So, here we respect this as an option but
// `compression` has higher precedence than `orc.compress`. It means if both are set,
// we will use `compression`.
// `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` is used in order.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is used in order -> are in order of precedence from highest to lowest

@gatorsmile
Copy link
Member

LGTM except a minor comment.

@dongjoon-hyun
Copy link
Member Author

Thank you, @gatorsmile . I fixed it, too.

@SparkQA
Copy link

SparkQA commented Aug 30, 2017

Test build #81265 has finished for PR 19055 at commit 9620e46.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in d8f4540 Aug 30, 2017
@HyukjinKwon
Copy link
Member

Merged to master.

@dongjoon-hyun
Copy link
Member Author

Thank you for merging, @HyukjinKwon !

@dongjoon-hyun
Copy link
Member Author

Also, Thank you again, @gatorsmile !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants