Skip to content

Commit d11ce09

Browse files
committed
[SPARK-18355][SQL] Use Spark schema to read ORC table instead of ORC file schema
1 parent 76fb173 commit d11ce09

File tree

2 files changed

+77
-11
lines changed

2 files changed

+77
-11
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
138138
if (maybePhysicalSchema.isEmpty) {
139139
Iterator.empty
140140
} else {
141-
val physicalSchema = maybePhysicalSchema.get
142-
OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
141+
OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema)
143142

144143
val orcRecordReader = {
145144
val job = Job.getInstance(conf)
@@ -163,6 +162,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
163162
// Unwraps `OrcStruct`s to `UnsafeRow`s
164163
OrcRelation.unwrapOrcStructs(
165164
conf,
165+
dataSchema,
166166
requiredSchema,
167167
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
168168
recordsIterator)
@@ -272,25 +272,35 @@ private[orc] object OrcRelation extends HiveInspectors {
272272
def unwrapOrcStructs(
273273
conf: Configuration,
274274
dataSchema: StructType,
275+
requiredSchema: StructType,
275276
maybeStructOI: Option[StructObjectInspector],
276277
iterator: Iterator[Writable]): Iterator[InternalRow] = {
277278
val deserializer = new OrcSerde
278-
val mutableRow = new SpecificInternalRow(dataSchema.map(_.dataType))
279-
val unsafeProjection = UnsafeProjection.create(dataSchema)
279+
val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType))
280+
val unsafeProjection = UnsafeProjection.create(requiredSchema)
280281

281282
def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = {
282-
val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map {
283-
case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal
283+
val (fieldRefs, fieldOrdinals) = requiredSchema.zipWithIndex.map {
284+
case (field, ordinal) =>
285+
var ref = oi.getStructFieldRef(field.name)
286+
if (ref == null) {
287+
val maybeIndex = dataSchema.getFieldIndex(field.name)
288+
if (maybeIndex.isDefined) {
289+
ref = oi.getStructFieldRef("_col" + maybeIndex.get)
290+
}
291+
}
292+
ref -> ordinal
284293
}.unzip
285294

286-
val unwrappers = fieldRefs.map(unwrapperFor)
295+
val unwrappers = fieldRefs.map(r => if (r == null) null else unwrapperFor(r))
287296

288297
iterator.map { value =>
289298
val raw = deserializer.deserialize(value)
290299
var i = 0
291300
val length = fieldRefs.length
292301
while (i < length) {
293-
val fieldValue = oi.getStructFieldData(raw, fieldRefs(i))
302+
val fieldRef = fieldRefs(i)
303+
val fieldValue = if (fieldRef == null) null else oi.getStructFieldData(raw, fieldRefs(i))
294304
if (fieldValue == null) {
295305
mutableRow.setNullAt(fieldOrdinals(i))
296306
} else {
@@ -306,8 +316,8 @@ private[orc] object OrcRelation extends HiveInspectors {
306316
}
307317

308318
def setRequiredColumns(
309-
conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = {
310-
val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer)
319+
conf: Configuration, dataSchema: StructType, requestedSchema: StructType): Unit = {
320+
val ids = requestedSchema.map(a => dataSchema.fieldIndex(a.name): Integer)
311321
val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip
312322
HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
313323
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
3434
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
3535
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
3636
import org.apache.spark.sql.functions._
37-
import org.apache.spark.sql.hive.HiveUtils
37+
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
3838
import org.apache.spark.sql.hive.test.TestHiveSingleton
3939
import org.apache.spark.sql.internal.SQLConf
4040
import org.apache.spark.sql.test.SQLTestUtils
@@ -2050,4 +2050,60 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
20502050
}
20512051
}
20522052
}
2053+
2054+
test("SPARK-18355 Use Spark schema to read ORC table instead of ORC file schema") {
2055+
val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
2056+
2057+
Seq("true", "false").foreach { value =>
2058+
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> value) {
2059+
withTempDatabase { db =>
2060+
client.runSqlHive(
2061+
s"""
2062+
|CREATE TABLE $db.t(
2063+
| click_id string,
2064+
| search_id string,
2065+
| uid bigint)
2066+
|PARTITIONED BY (
2067+
| ts string,
2068+
| hour string)
2069+
|STORED AS ORC
2070+
""".stripMargin)
2071+
2072+
client.runSqlHive(
2073+
s"""
2074+
|INSERT INTO TABLE $db.t
2075+
|PARTITION (ts = '98765', hour = '01')
2076+
|VALUES (12, 2, 12345)
2077+
""".stripMargin
2078+
)
2079+
2080+
checkAnswer(
2081+
sql(s"SELECT * FROM $db.t"),
2082+
Row("12", "2", 12345, "98765", "01"))
2083+
2084+
client.runSqlHive(s"ALTER TABLE $db.t ADD COLUMNS (dummy string)")
2085+
2086+
checkAnswer(
2087+
sql(s"SELECT click_id, search_id FROM $db.t"),
2088+
Row("12", "2"))
2089+
2090+
checkAnswer(
2091+
sql(s"SELECT search_id, click_id FROM $db.t"),
2092+
Row("2", "12"))
2093+
2094+
checkAnswer(
2095+
sql(s"SELECT search_id FROM $db.t"),
2096+
Row("2"))
2097+
2098+
checkAnswer(
2099+
sql(s"SELECT dummy, click_id FROM $db.t"),
2100+
Row(null, "12"))
2101+
2102+
checkAnswer(
2103+
sql(s"SELECT * FROM $db.t"),
2104+
Row("12", "2", 12345, null, "98765", "01"))
2105+
}
2106+
}
2107+
}
2108+
}
20532109
}

0 commit comments

Comments
 (0)