Skip to content

Commit f23dddf

Browse files
dongjoon-hyuncloud-fan
authored andcommitted
[SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileFormat based on ORC 1.4.1
## What changes were proposed in this pull request? Since [SPARK-2883](https://issues.apache.org/jira/browse/SPARK-2883), Apache Spark supports Apache ORC inside `sql/hive` module with Hive dependency. This PR aims to add a new ORC data source inside `sql/core` and to replace the old ORC data source eventually. This PR resolves the following three issues. - [SPARK-20682](https://issues.apache.org/jira/browse/SPARK-20682): Add new ORCFileFormat based on Apache ORC 1.4.1 - [SPARK-15474](https://issues.apache.org/jira/browse/SPARK-15474): ORC data source fails to write and read back empty dataframe - [SPARK-21791](https://issues.apache.org/jira/browse/SPARK-21791): ORC should support column names with dot ## How was this patch tested? Pass the Jenkins with the existing all tests and new tests for SPARK-15474 and SPARK-21791. Author: Dongjoon Hyun <dongjoon@apache.org> Author: Wenchen Fan <wenchen@databricks.com> Closes #19651 from dongjoon-hyun/SPARK-20682.
1 parent d2cf95a commit f23dddf

File tree

7 files changed

+1009
-2
lines changed

7 files changed

+1009
-2
lines changed
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.orc
19+
20+
import org.apache.hadoop.io._
21+
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
22+
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
23+
24+
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
26+
import org.apache.spark.sql.catalyst.util._
27+
import org.apache.spark.sql.types._
28+
import org.apache.spark.unsafe.types.UTF8String
29+
30+
/**
31+
* A deserializer to deserialize ORC structs to Spark rows.
32+
*/
33+
class OrcDeserializer(
34+
dataSchema: StructType,
35+
requiredSchema: StructType,
36+
requestedColIds: Array[Int]) {
37+
38+
private val resultRow = new SpecificInternalRow(requiredSchema.map(_.dataType))
39+
40+
private val fieldWriters: Array[WritableComparable[_] => Unit] = {
41+
requiredSchema.zipWithIndex
42+
// The value of missing columns are always null, do not need writers.
43+
.filterNot { case (_, index) => requestedColIds(index) == -1 }
44+
.map { case (f, index) =>
45+
val writer = newWriter(f.dataType, new RowUpdater(resultRow))
46+
(value: WritableComparable[_]) => writer(index, value)
47+
}.toArray
48+
}
49+
50+
private val validColIds = requestedColIds.filterNot(_ == -1)
51+
52+
def deserialize(orcStruct: OrcStruct): InternalRow = {
53+
var i = 0
54+
while (i < validColIds.length) {
55+
val value = orcStruct.getFieldValue(validColIds(i))
56+
if (value == null) {
57+
resultRow.setNullAt(i)
58+
} else {
59+
fieldWriters(i)(value)
60+
}
61+
i += 1
62+
}
63+
resultRow
64+
}
65+
66+
/**
67+
* Creates a writer to write ORC values to Catalyst data structure at the given ordinal.
68+
*/
69+
private def newWriter(
70+
dataType: DataType, updater: CatalystDataUpdater): (Int, WritableComparable[_]) => Unit =
71+
dataType match {
72+
case NullType => (ordinal, _) =>
73+
updater.setNullAt(ordinal)
74+
75+
case BooleanType => (ordinal, value) =>
76+
updater.setBoolean(ordinal, value.asInstanceOf[BooleanWritable].get)
77+
78+
case ByteType => (ordinal, value) =>
79+
updater.setByte(ordinal, value.asInstanceOf[ByteWritable].get)
80+
81+
case ShortType => (ordinal, value) =>
82+
updater.setShort(ordinal, value.asInstanceOf[ShortWritable].get)
83+
84+
case IntegerType => (ordinal, value) =>
85+
updater.setInt(ordinal, value.asInstanceOf[IntWritable].get)
86+
87+
case LongType => (ordinal, value) =>
88+
updater.setLong(ordinal, value.asInstanceOf[LongWritable].get)
89+
90+
case FloatType => (ordinal, value) =>
91+
updater.setFloat(ordinal, value.asInstanceOf[FloatWritable].get)
92+
93+
case DoubleType => (ordinal, value) =>
94+
updater.setDouble(ordinal, value.asInstanceOf[DoubleWritable].get)
95+
96+
case StringType => (ordinal, value) =>
97+
updater.set(ordinal, UTF8String.fromBytes(value.asInstanceOf[Text].copyBytes))
98+
99+
case BinaryType => (ordinal, value) =>
100+
val binary = value.asInstanceOf[BytesWritable]
101+
val bytes = new Array[Byte](binary.getLength)
102+
System.arraycopy(binary.getBytes, 0, bytes, 0, binary.getLength)
103+
updater.set(ordinal, bytes)
104+
105+
case DateType => (ordinal, value) =>
106+
updater.setInt(ordinal, DateTimeUtils.fromJavaDate(value.asInstanceOf[DateWritable].get))
107+
108+
case TimestampType => (ordinal, value) =>
109+
updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))
110+
111+
case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
112+
val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
113+
val v = Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
114+
v.changePrecision(precision, scale)
115+
updater.set(ordinal, v)
116+
117+
case st: StructType => (ordinal, value) =>
118+
val result = new SpecificInternalRow(st)
119+
val fieldUpdater = new RowUpdater(result)
120+
val fieldConverters = st.map(_.dataType).map { dt =>
121+
newWriter(dt, fieldUpdater)
122+
}.toArray
123+
val orcStruct = value.asInstanceOf[OrcStruct]
124+
125+
var i = 0
126+
while (i < st.length) {
127+
val value = orcStruct.getFieldValue(i)
128+
if (value == null) {
129+
result.setNullAt(i)
130+
} else {
131+
fieldConverters(i)(i, value)
132+
}
133+
i += 1
134+
}
135+
136+
updater.set(ordinal, result)
137+
138+
case ArrayType(elementType, _) => (ordinal, value) =>
139+
val orcArray = value.asInstanceOf[OrcList[WritableComparable[_]]]
140+
val length = orcArray.size()
141+
val result = createArrayData(elementType, length)
142+
val elementUpdater = new ArrayDataUpdater(result)
143+
val elementConverter = newWriter(elementType, elementUpdater)
144+
145+
var i = 0
146+
while (i < length) {
147+
val value = orcArray.get(i)
148+
if (value == null) {
149+
result.setNullAt(i)
150+
} else {
151+
elementConverter(i, value)
152+
}
153+
i += 1
154+
}
155+
156+
updater.set(ordinal, result)
157+
158+
case MapType(keyType, valueType, _) => (ordinal, value) =>
159+
val orcMap = value.asInstanceOf[OrcMap[WritableComparable[_], WritableComparable[_]]]
160+
val length = orcMap.size()
161+
val keyArray = createArrayData(keyType, length)
162+
val keyUpdater = new ArrayDataUpdater(keyArray)
163+
val keyConverter = newWriter(keyType, keyUpdater)
164+
val valueArray = createArrayData(valueType, length)
165+
val valueUpdater = new ArrayDataUpdater(valueArray)
166+
val valueConverter = newWriter(valueType, valueUpdater)
167+
168+
var i = 0
169+
val it = orcMap.entrySet().iterator()
170+
while (it.hasNext) {
171+
val entry = it.next()
172+
keyConverter(i, entry.getKey)
173+
val value = entry.getValue
174+
if (value == null) {
175+
valueArray.setNullAt(i)
176+
} else {
177+
valueConverter(i, value)
178+
}
179+
i += 1
180+
}
181+
182+
updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
183+
184+
case udt: UserDefinedType[_] => newWriter(udt.sqlType, updater)
185+
186+
case _ =>
187+
throw new UnsupportedOperationException(s"$dataType is not supported yet.")
188+
}
189+
190+
private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match {
191+
case BooleanType => UnsafeArrayData.fromPrimitiveArray(new Array[Boolean](length))
192+
case ByteType => UnsafeArrayData.fromPrimitiveArray(new Array[Byte](length))
193+
case ShortType => UnsafeArrayData.fromPrimitiveArray(new Array[Short](length))
194+
case IntegerType => UnsafeArrayData.fromPrimitiveArray(new Array[Int](length))
195+
case LongType => UnsafeArrayData.fromPrimitiveArray(new Array[Long](length))
196+
case FloatType => UnsafeArrayData.fromPrimitiveArray(new Array[Float](length))
197+
case DoubleType => UnsafeArrayData.fromPrimitiveArray(new Array[Double](length))
198+
case _ => new GenericArrayData(new Array[Any](length))
199+
}
200+
201+
/**
202+
* A base interface for updating values inside catalyst data structure like `InternalRow` and
203+
* `ArrayData`.
204+
*/
205+
sealed trait CatalystDataUpdater {
206+
def set(ordinal: Int, value: Any): Unit
207+
208+
def setNullAt(ordinal: Int): Unit = set(ordinal, null)
209+
def setBoolean(ordinal: Int, value: Boolean): Unit = set(ordinal, value)
210+
def setByte(ordinal: Int, value: Byte): Unit = set(ordinal, value)
211+
def setShort(ordinal: Int, value: Short): Unit = set(ordinal, value)
212+
def setInt(ordinal: Int, value: Int): Unit = set(ordinal, value)
213+
def setLong(ordinal: Int, value: Long): Unit = set(ordinal, value)
214+
def setDouble(ordinal: Int, value: Double): Unit = set(ordinal, value)
215+
def setFloat(ordinal: Int, value: Float): Unit = set(ordinal, value)
216+
}
217+
218+
final class RowUpdater(row: InternalRow) extends CatalystDataUpdater {
219+
override def setNullAt(ordinal: Int): Unit = row.setNullAt(ordinal)
220+
override def set(ordinal: Int, value: Any): Unit = row.update(ordinal, value)
221+
222+
override def setBoolean(ordinal: Int, value: Boolean): Unit = row.setBoolean(ordinal, value)
223+
override def setByte(ordinal: Int, value: Byte): Unit = row.setByte(ordinal, value)
224+
override def setShort(ordinal: Int, value: Short): Unit = row.setShort(ordinal, value)
225+
override def setInt(ordinal: Int, value: Int): Unit = row.setInt(ordinal, value)
226+
override def setLong(ordinal: Int, value: Long): Unit = row.setLong(ordinal, value)
227+
override def setDouble(ordinal: Int, value: Double): Unit = row.setDouble(ordinal, value)
228+
override def setFloat(ordinal: Int, value: Float): Unit = row.setFloat(ordinal, value)
229+
}
230+
231+
final class ArrayDataUpdater(array: ArrayData) extends CatalystDataUpdater {
232+
override def setNullAt(ordinal: Int): Unit = array.setNullAt(ordinal)
233+
override def set(ordinal: Int, value: Any): Unit = array.update(ordinal, value)
234+
235+
override def setBoolean(ordinal: Int, value: Boolean): Unit = array.setBoolean(ordinal, value)
236+
override def setByte(ordinal: Int, value: Byte): Unit = array.setByte(ordinal, value)
237+
override def setShort(ordinal: Int, value: Short): Unit = array.setShort(ordinal, value)
238+
override def setInt(ordinal: Int, value: Int): Unit = array.setInt(ordinal, value)
239+
override def setLong(ordinal: Int, value: Long): Unit = array.setLong(ordinal, value)
240+
override def setDouble(ordinal: Int, value: Double): Unit = array.setDouble(ordinal, value)
241+
override def setFloat(ordinal: Int, value: Float): Unit = array.setFloat(ordinal, value)
242+
}
243+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

Lines changed: 137 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,29 @@
1717

1818
package org.apache.spark.sql.execution.datasources.orc
1919

20-
import org.apache.orc.TypeDescription
20+
import java.io._
21+
import java.net.URI
2122

23+
import org.apache.hadoop.conf.Configuration
24+
import org.apache.hadoop.fs.{FileStatus, Path}
25+
import org.apache.hadoop.mapred.JobConf
26+
import org.apache.hadoop.mapreduce._
27+
import org.apache.hadoop.mapreduce.lib.input.FileSplit
28+
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
29+
import org.apache.orc._
30+
import org.apache.orc.OrcConf.{COMPRESS, MAPRED_OUTPUT_SCHEMA}
31+
import org.apache.orc.mapred.OrcStruct
32+
import org.apache.orc.mapreduce._
33+
34+
import org.apache.spark.TaskContext
2235
import org.apache.spark.sql.AnalysisException
23-
import org.apache.spark.sql.types.StructType
36+
import org.apache.spark.sql.SparkSession
37+
import org.apache.spark.sql.catalyst.InternalRow
38+
import org.apache.spark.sql.catalyst.expressions._
39+
import org.apache.spark.sql.execution.datasources._
40+
import org.apache.spark.sql.sources._
41+
import org.apache.spark.sql.types._
42+
import org.apache.spark.util.SerializableConfiguration
2443

2544
private[sql] object OrcFileFormat {
2645
private def checkFieldName(name: String): Unit = {
@@ -39,3 +58,119 @@ private[sql] object OrcFileFormat {
3958
names.foreach(checkFieldName)
4059
}
4160
}
61+
62+
/**
63+
* New ORC File Format based on Apache ORC.
64+
*/
65+
class OrcFileFormat
66+
extends FileFormat
67+
with DataSourceRegister
68+
with Serializable {
69+
70+
override def shortName(): String = "orc"
71+
72+
override def toString: String = "ORC"
73+
74+
override def hashCode(): Int = getClass.hashCode()
75+
76+
override def equals(other: Any): Boolean = other.isInstanceOf[OrcFileFormat]
77+
78+
override def inferSchema(
79+
sparkSession: SparkSession,
80+
options: Map[String, String],
81+
files: Seq[FileStatus]): Option[StructType] = {
82+
OrcUtils.readSchema(sparkSession, files)
83+
}
84+
85+
override def prepareWrite(
86+
sparkSession: SparkSession,
87+
job: Job,
88+
options: Map[String, String],
89+
dataSchema: StructType): OutputWriterFactory = {
90+
val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
91+
92+
val conf = job.getConfiguration
93+
94+
conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, dataSchema.catalogString)
95+
96+
conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
97+
98+
conf.asInstanceOf[JobConf]
99+
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
100+
101+
new OutputWriterFactory {
102+
override def newInstance(
103+
path: String,
104+
dataSchema: StructType,
105+
context: TaskAttemptContext): OutputWriter = {
106+
new OrcOutputWriter(path, dataSchema, context)
107+
}
108+
109+
override def getFileExtension(context: TaskAttemptContext): String = {
110+
val compressionExtension: String = {
111+
val name = context.getConfiguration.get(COMPRESS.getAttribute)
112+
OrcUtils.extensionsForCompressionCodecNames.getOrElse(name, "")
113+
}
114+
115+
compressionExtension + ".orc"
116+
}
117+
}
118+
}
119+
120+
override def isSplitable(
121+
sparkSession: SparkSession,
122+
options: Map[String, String],
123+
path: Path): Boolean = {
124+
true
125+
}
126+
127+
override def buildReader(
128+
sparkSession: SparkSession,
129+
dataSchema: StructType,
130+
partitionSchema: StructType,
131+
requiredSchema: StructType,
132+
filters: Seq[Filter],
133+
options: Map[String, String],
134+
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
135+
if (sparkSession.sessionState.conf.orcFilterPushDown) {
136+
OrcFilters.createFilter(dataSchema, filters).foreach { f =>
137+
OrcInputFormat.setSearchArgument(hadoopConf, f, dataSchema.fieldNames)
138+
}
139+
}
140+
141+
val broadcastedConf =
142+
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
143+
val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
144+
145+
(file: PartitionedFile) => {
146+
val conf = broadcastedConf.value.value
147+
148+
val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
149+
isCaseSensitive, dataSchema, requiredSchema, new Path(new URI(file.filePath)), conf)
150+
151+
if (requestedColIdsOrEmptyFile.isEmpty) {
152+
Iterator.empty
153+
} else {
154+
val requestedColIds = requestedColIdsOrEmptyFile.get
155+
assert(requestedColIds.length == requiredSchema.length,
156+
"[BUG] requested column IDs do not match required schema")
157+
conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute,
158+
requestedColIds.filter(_ != -1).sorted.mkString(","))
159+
160+
val fileSplit =
161+
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
162+
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
163+
val taskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
164+
165+
val orcRecordReader = new OrcInputFormat[OrcStruct]
166+
.createRecordReader(fileSplit, taskAttemptContext)
167+
val iter = new RecordReaderIterator[OrcStruct](orcRecordReader)
168+
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
169+
170+
val unsafeProjection = UnsafeProjection.create(requiredSchema)
171+
val deserializer = new OrcDeserializer(dataSchema, requiredSchema, requestedColIds)
172+
iter.map(value => unsafeProjection(deserializer.deserialize(value)))
173+
}
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)