Skip to content

Commit

Permalink
[SPARK-21839][SQL] Support SQL config for ORC compression
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR aims to support `spark.sql.orc.compression.codec` like Parquet's `spark.sql.parquet.compression.codec`. Users can use SQLConf to control ORC compression, too.

## How was this patch tested?

Pass the Jenkins with new and updated test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#19055 from dongjoon-hyun/SPARK-21839.
  • Loading branch information
dongjoon-hyun authored and HyukjinKwon committed Aug 30, 2017
1 parent 6949a9c commit d8f4540
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 15 deletions.
5 changes: 3 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,9 @@ def orc(self, path, mode=None, partitionBy=None, compression=None):
:param partitionBy: names of partitioning columns
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, snappy, zlib, and lzo).
This will override ``orc.compress``. If None is set, it uses the
default value, ``snappy``.
This will override ``orc.compress`` and
``spark.sql.orc.compression.codec``. If None is set, it uses the value
specified in ``spark.sql.orc.compression.codec``.
>>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec use when writing ORC files. Acceptable values include: " +
"none, uncompressed, snappy, zlib, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
.createWithDefault("snappy")

val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
Expand Down Expand Up @@ -998,6 +1006,8 @@ class SQLConf extends Serializable with Logging {

def useCompression: Boolean = getConf(COMPRESS_CACHED)

def orcCompressionCodec: String = getConf(ORC_COMPRESSION)

def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)

def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*
* You can set the following ORC-specific option(s) for writing ORC files:
* <ul>
* <li>`compression` (default `snappy`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
* This will override `orc.compress`.</li>
* <li>`compression` (default is the value specified in `spark.sql.orc.compression.codec`):
* compression codec to use when saving to file. This can be one of the known case-insensitive
* shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override
* `orc.compress` and `spark.sql.parquet.compression.codec`. If `orc.compress` is given,
* it overrides `spark.sql.parquet.compression.codec`.</li>
* </ul>
*
* @since 1.5.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val orcOptions = new OrcOptions(options)
val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)

val configuration = job.getConfiguration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,34 @@ package org.apache.spark.sql.hive.orc
import java.util.Locale

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf

/**
* Options for the ORC data source.
*/
private[orc] class OrcOptions(@transient private val parameters: CaseInsensitiveMap[String])
private[orc] class OrcOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient private val sqlConf: SQLConf)
extends Serializable {

import OrcOptions._

def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
def this(parameters: Map[String, String], sqlConf: SQLConf) =
this(CaseInsensitiveMap(parameters), sqlConf)

/**
* Compression codec to use. By default snappy compression.
* Compression codec to use.
* Acceptable values are defined in [[shortOrcCompressionCodecNames]].
*/
val compressionCodec: String = {
// `orc.compress` is a ORC configuration. So, here we respect this as an option but
// `compression` has higher precedence than `orc.compress`. It means if both are set,
// we will use `compression`.
// `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` are
// in order of precedence from highest to lowest.
val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION)
val codecName = parameters
.get("compression")
.orElse(orcCompressionConf)
.getOrElse("snappy").toLowerCase(Locale.ROOT)
.getOrElse(sqlConf.orcCompressionCodec)
.toLowerCase(Locale.ROOT)
if (!shortOrcCompressionCodecNames.contains(codecName)) {
val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))
throw new IllegalArgumentException(s"Codec [$codecName] " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -149,7 +149,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
}

test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
val conf = sqlContext.sessionState.conf
assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodec == "NONE")
}

test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
Expand Down Expand Up @@ -194,6 +195,30 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
Utils.deleteRecursively(location)
}
}

test("SPARK-21839: Add SQL config for ORC compression") {
val conf = sqlContext.sessionState.conf
// Test if the default of spark.sql.orc.compression.codec is snappy
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY")

// OrcOptions's parameters have a higher priority than SQL configuration.
// `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
val map1 = Map("orc.compress" -> "zlib")
val map2 = Map("orc.compress" -> "zlib", "compression" -> "lzo")
assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
}

// Test all the valid options of spark.sql.orc.compression.codec
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c =>
withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
val expected = if (c == "UNCOMPRESSED") "NONE" else c
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
}
}
}
}

class OrcSourceSuite extends OrcSuite {
Expand Down

0 comments on commit d8f4540

Please sign in to comment.