Skip to content

Commit aa31e43

Browse files
zhzhanmarmbrus
authored andcommitted
[SPARK-2883] [SQL] ORC data source for Spark SQL
This PR updates PR #6135 authored by zhzhan from Hortonworks. ---- This PR implements a Spark SQL data source for accessing ORC files. > **NOTE** > > Although ORC is now an Apache TLP, the codebase is still tightly coupled with Hive. That's why the new ORC data source is under `org.apache.spark.sql.hive` package, and must be used with `HiveContext`. However, it doesn't require existing Hive installation to access ORC files. 1. Saving/loading ORC files without contacting Hive metastore 1. Support for complex data types (i.e. array, map, and struct) 1. Aware of common optimizations provided by Spark SQL: - Column pruning - Partitioning pruning - Filter push-down 1. Schema evolution support 1. Hive metastore table conversion This PR also include initial work done by scwf from Huawei (PR #3753). Author: Zhan Zhang <zhazhan@gmail.com> Author: Cheng Lian <lian@databricks.com> Closes #6194 from liancheng/polishing-orc and squashes the following commits: 55ecd96 [Cheng Lian] Reorganizes ORC test suites d4afeed [Cheng Lian] Addresses comments 21ada22 [Cheng Lian] Adds @SInCE and @experimental annotations 128bd3b [Cheng Lian] ORC filter bug fix d734496 [Cheng Lian] Polishes the ORC data source 2650a42 [Zhan Zhang] resolve review comments 3c9038e [Zhan Zhang] resolve review comments 7b3c7c5 [Zhan Zhang] save mode fix f95abfd [Zhan Zhang] reuse test suite 7cc2c64 [Zhan Zhang] predicate fix 4e61c16 [Zhan Zhang] minor change 305418c [Zhan Zhang] orc data source support
1 parent 9c7e802 commit aa31e43

File tree

14 files changed

+1477
-76
lines changed

14 files changed

+1477
-76
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ private[spark] object SQLConf {
4343
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
4444
val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
4545

46+
val ORC_FILTER_PUSHDOWN_ENABLED = "spark.sql.orc.filterPushdown"
47+
4648
val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"
4749

4850
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
@@ -143,6 +145,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
143145
private[spark] def parquetUseDataSourceApi =
144146
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
145147

148+
private[spark] def orcFilterPushDown =
149+
getConf(ORC_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
150+
146151
/** When true uses verifyPartitionPath to prune the path which is not exists. */
147152
private[spark] def verifyPartitionPath =
148153
getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
@@ -254,7 +259,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
254259

255260
private[spark] def dataFrameRetainGroupColumns: Boolean =
256261
getConf(DATAFRAME_RETAIN_GROUP_COLUMNS, "true").toBoolean
257-
262+
258263
/** ********************** SQLConf functionality methods ************ */
259264

260265
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala

Lines changed: 4 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ import java.io.File
2121

2222
import scala.reflect.ClassTag
2323
import scala.reflect.runtime.universe.TypeTag
24-
import scala.util.Try
2524

26-
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
27-
import org.apache.spark.util.Utils
25+
import org.apache.spark.sql.test.SQLTestUtils
26+
import org.apache.spark.sql.{DataFrame, SaveMode}
2827

2928
/**
3029
* A helper trait that provides convenient facilities for Parquet testing.
@@ -33,54 +32,9 @@ import org.apache.spark.util.Utils
3332
* convenient to use tuples rather than special case classes when writing test cases/suites.
3433
* Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
3534
*/
36-
private[sql] trait ParquetTest {
37-
val sqlContext: SQLContext
38-
35+
private[sql] trait ParquetTest extends SQLTestUtils {
3936
import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
40-
import sqlContext.{conf, sparkContext}
41-
42-
protected def configuration = sparkContext.hadoopConfiguration
43-
44-
/**
45-
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
46-
* configurations.
47-
*
48-
* @todo Probably this method should be moved to a more general place
49-
*/
50-
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
51-
val (keys, values) = pairs.unzip
52-
val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
53-
(keys, values).zipped.foreach(conf.setConf)
54-
try f finally {
55-
keys.zip(currentValues).foreach {
56-
case (key, Some(value)) => conf.setConf(key, value)
57-
case (key, None) => conf.unsetConf(key)
58-
}
59-
}
60-
}
61-
62-
/**
63-
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
64-
* a file/directory is created there by `f`, it will be delete after `f` returns.
65-
*
66-
* @todo Probably this method should be moved to a more general place
67-
*/
68-
protected def withTempPath(f: File => Unit): Unit = {
69-
val path = Utils.createTempDir()
70-
path.delete()
71-
try f(path) finally Utils.deleteRecursively(path)
72-
}
73-
74-
/**
75-
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
76-
* returns.
77-
*
78-
* @todo Probably this method should be moved to a more general place
79-
*/
80-
protected def withTempDir(f: File => Unit): Unit = {
81-
val dir = Utils.createTempDir().getCanonicalFile
82-
try f(dir) finally Utils.deleteRecursively(dir)
83-
}
37+
import sqlContext.sparkContext
8438

8539
/**
8640
* Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f`
@@ -105,13 +59,6 @@ private[sql] trait ParquetTest {
10559
withParquetFile(data)(path => f(sqlContext.read.parquet(path)))
10660
}
10761

108-
/**
109-
* Drops temporary table `tableName` after calling `f`.
110-
*/
111-
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
112-
try f finally sqlContext.dropTempTable(tableName)
113-
}
114-
11562
/**
11663
* Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a
11764
* temporary table named `tableName`, then call `f`. The temporary table together with the

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,18 +188,20 @@ private[sql] class DDLParser(
188188
private[sql] object ResolvedDataSource {
189189

190190
private val builtinSources = Map(
191-
"jdbc" -> classOf[org.apache.spark.sql.jdbc.DefaultSource],
192-
"json" -> classOf[org.apache.spark.sql.json.DefaultSource],
193-
"parquet" -> classOf[org.apache.spark.sql.parquet.DefaultSource]
191+
"jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource",
192+
"json" -> "org.apache.spark.sql.json.DefaultSource",
193+
"parquet" -> "org.apache.spark.sql.parquet.DefaultSource",
194+
"orc" -> "org.apache.spark.sql.hive.orc.DefaultSource"
194195
)
195196

196197
/** Given a provider name, look up the data source class definition. */
197198
def lookupDataSource(provider: String): Class[_] = {
199+
val loader = Utils.getContextOrSparkClassLoader
200+
198201
if (builtinSources.contains(provider)) {
199-
return builtinSources(provider)
202+
return loader.loadClass(builtinSources(provider))
200203
}
201204

202-
val loader = Utils.getContextOrSparkClassLoader
203205
try {
204206
loader.loadClass(provider)
205207
} catch {
@@ -208,7 +210,11 @@ private[sql] object ResolvedDataSource {
208210
loader.loadClass(provider + ".DefaultSource")
209211
} catch {
210212
case cnf: java.lang.ClassNotFoundException =>
211-
sys.error(s"Failed to load class for data source: $provider")
213+
if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
214+
sys.error("The ORC data source must be used with Hive support enabled.")
215+
} else {
216+
sys.error(s"Failed to load class for data source: $provider")
217+
}
212218
}
213219
}
214220
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.test
19+
20+
import java.io.File
21+
22+
import scala.util.Try
23+
24+
import org.apache.spark.sql.SQLContext
25+
import org.apache.spark.util.Utils
26+
27+
trait SQLTestUtils {
28+
val sqlContext: SQLContext
29+
30+
import sqlContext.{conf, sparkContext}
31+
32+
protected def configuration = sparkContext.hadoopConfiguration
33+
34+
/**
35+
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
36+
* configurations.
37+
*
38+
* @todo Probably this method should be moved to a more general place
39+
*/
40+
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
41+
val (keys, values) = pairs.unzip
42+
val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
43+
(keys, values).zipped.foreach(conf.setConf)
44+
try f finally {
45+
keys.zip(currentValues).foreach {
46+
case (key, Some(value)) => conf.setConf(key, value)
47+
case (key, None) => conf.unsetConf(key)
48+
}
49+
}
50+
}
51+
52+
/**
53+
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
54+
* a file/directory is created there by `f`, it will be delete after `f` returns.
55+
*
56+
* @todo Probably this method should be moved to a more general place
57+
*/
58+
protected def withTempPath(f: File => Unit): Unit = {
59+
val path = Utils.createTempDir()
60+
path.delete()
61+
try f(path) finally Utils.deleteRecursively(path)
62+
}
63+
64+
/**
65+
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
66+
* returns.
67+
*
68+
* @todo Probably this method should be moved to a more general place
69+
*/
70+
protected def withTempDir(f: File => Unit): Unit = {
71+
val dir = Utils.createTempDir().getCanonicalFile
72+
try f(dir) finally Utils.deleteRecursively(dir)
73+
}
74+
75+
/**
76+
* Drops temporary table `tableName` after calling `f`.
77+
*/
78+
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
79+
try f finally sqlContext.dropTempTable(tableName)
80+
}
81+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.spark.sql.hive
1919

2020
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
21-
import org.apache.hadoop.hive.serde2.objectinspector._
2221
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
22+
import org.apache.hadoop.hive.serde2.objectinspector.{StructField => HiveStructField, _}
2323
import org.apache.hadoop.hive.serde2.{io => hiveIo}
2424
import org.apache.hadoop.{io => hadoopIo}
2525

@@ -122,7 +122,7 @@ import scala.collection.JavaConversions._
122122
* even a normal java object (POJO)
123123
* UnionObjectInspector: (tag: Int, object data) (TODO: not supported by SparkSQL yet)
124124
*
125-
* 3) ConstantObjectInspector:
125+
* 3) ConstantObjectInspector:
126126
* Constant object inspector can be either primitive type or Complex type, and it bundles a
127127
* constant value as its property, usually the value is created when the constant object inspector
128128
* constructed.
@@ -133,7 +133,7 @@ import scala.collection.JavaConversions._
133133
}
134134
}}}
135135
* Hive provides 3 built-in constant object inspectors:
136-
* Primitive Object Inspectors:
136+
* Primitive Object Inspectors:
137137
* WritableConstantStringObjectInspector
138138
* WritableConstantHiveVarcharObjectInspector
139139
* WritableConstantHiveDecimalObjectInspector
@@ -147,9 +147,9 @@ import scala.collection.JavaConversions._
147147
* WritableConstantByteObjectInspector
148148
* WritableConstantBinaryObjectInspector
149149
* WritableConstantDateObjectInspector
150-
* Map Object Inspector:
150+
* Map Object Inspector:
151151
* StandardConstantMapObjectInspector
152-
* List Object Inspector:
152+
* List Object Inspector:
153153
* StandardConstantListObjectInspector]]
154154
* Struct Object Inspector: Hive doesn't provide the built-in constant object inspector for Struct
155155
* Union Object Inspector: Hive doesn't provide the built-in constant object inspector for Union
@@ -250,9 +250,9 @@ private[hive] trait HiveInspectors {
250250
poi.getWritableConstantValue.getHiveDecimal)
251251
case poi: WritableConstantTimestampObjectInspector =>
252252
poi.getWritableConstantValue.getTimestamp.clone()
253-
case poi: WritableConstantIntObjectInspector =>
253+
case poi: WritableConstantIntObjectInspector =>
254254
poi.getWritableConstantValue.get()
255-
case poi: WritableConstantDoubleObjectInspector =>
255+
case poi: WritableConstantDoubleObjectInspector =>
256256
poi.getWritableConstantValue.get()
257257
case poi: WritableConstantBooleanObjectInspector =>
258258
poi.getWritableConstantValue.get()
@@ -306,7 +306,7 @@ private[hive] trait HiveInspectors {
306306
// In order to keep backward-compatible, we have to copy the
307307
// bytes with old apis
308308
val bw = x.getPrimitiveWritableObject(data)
309-
val result = new Array[Byte](bw.getLength())
309+
val result = new Array[Byte](bw.getLength())
310310
System.arraycopy(bw.getBytes(), 0, result, 0, bw.getLength())
311311
result
312312
case x: DateObjectInspector if x.preferWritable() =>
@@ -394,6 +394,30 @@ private[hive] trait HiveInspectors {
394394
identity[Any]
395395
}
396396

397+
/**
398+
* Builds specific unwrappers ahead of time according to object inspector
399+
* types to avoid pattern matching and branching costs per row.
400+
*/
401+
def unwrapperFor(field: HiveStructField): (Any, MutableRow, Int) => Unit =
402+
field.getFieldObjectInspector match {
403+
case oi: BooleanObjectInspector =>
404+
(value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
405+
case oi: ByteObjectInspector =>
406+
(value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
407+
case oi: ShortObjectInspector =>
408+
(value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
409+
case oi: IntObjectInspector =>
410+
(value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
411+
case oi: LongObjectInspector =>
412+
(value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
413+
case oi: FloatObjectInspector =>
414+
(value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
415+
case oi: DoubleObjectInspector =>
416+
(value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
417+
case oi =>
418+
(value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrap(value, oi)
419+
}
420+
397421
/**
398422
* Converts native catalyst types to the types expected by Hive
399423
* @param a the value to be wrapped
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.orc
19+
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.hadoop.fs.Path
22+
import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
23+
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
24+
25+
import org.apache.spark.Logging
26+
import org.apache.spark.deploy.SparkHadoopUtil
27+
import org.apache.spark.sql.hive.HiveMetastoreTypes
28+
import org.apache.spark.sql.types.StructType
29+
30+
private[orc] object OrcFileOperator extends Logging{
31+
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
32+
val conf = config.getOrElse(new Configuration)
33+
val fspath = new Path(pathStr)
34+
val fs = fspath.getFileSystem(conf)
35+
val orcFiles = listOrcFiles(pathStr, conf)
36+
37+
// TODO Need to consider all files when schema evolution is taken into account.
38+
OrcFile.createReader(fs, orcFiles.head)
39+
}
40+
41+
def readSchema(path: String, conf: Option[Configuration]): StructType = {
42+
val reader = getFileReader(path, conf)
43+
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
44+
val schema = readerInspector.getTypeName
45+
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
46+
}
47+
48+
def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
49+
getFileReader(path, conf).getObjectInspector.asInstanceOf[StructObjectInspector]
50+
}
51+
52+
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
53+
val origPath = new Path(pathStr)
54+
val fs = origPath.getFileSystem(conf)
55+
val path = origPath.makeQualified(fs)
56+
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
57+
.filterNot(_.isDir)
58+
.map(_.getPath)
59+
.filterNot(_.getName.startsWith("_"))
60+
.filterNot(_.getName.startsWith("."))
61+
62+
if (paths == null || paths.size == 0) {
63+
throw new IllegalArgumentException(
64+
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
65+
}
66+
67+
paths
68+
}
69+
}

0 commit comments

Comments
 (0)