Skip to content

Commit e6e3600

Browse files
dongjoon-hyuncloud-fan
authored andcommitted
[SPARK-14387][SPARK-16628][SPARK-18355][SQL] Use Spark schema to read ORC table instead of ORC file schema
## What changes were proposed in this pull request? Before Hive 2.0, ORC File schema has invalid column names like `_col1` and `_col2`. This is a well-known limitation and there are several Apache Spark issues with `spark.sql.hive.convertMetastoreOrc=true`. This PR ignores ORC File schema and use Spark schema. ## How was this patch tested? Pass the newly added test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #19470 from dongjoon-hyun/SPARK-18355.
1 parent 2f00a71 commit e6e3600

File tree

2 files changed

+80
-13
lines changed

2 files changed

+80
-13
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
134134
// SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
135135
// case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
136136
// using the given physical schema. Instead, we simply return an empty iterator.
137-
val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf))
138-
if (maybePhysicalSchema.isEmpty) {
137+
val isEmptyFile = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)).isEmpty
138+
if (isEmptyFile) {
139139
Iterator.empty
140140
} else {
141-
val physicalSchema = maybePhysicalSchema.get
142-
OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
141+
OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema)
143142

144143
val orcRecordReader = {
145144
val job = Job.getInstance(conf)
@@ -163,6 +162,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
163162
// Unwraps `OrcStruct`s to `UnsafeRow`s
164163
OrcRelation.unwrapOrcStructs(
165164
conf,
165+
dataSchema,
166166
requiredSchema,
167167
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
168168
recordsIterator)
@@ -272,25 +272,32 @@ private[orc] object OrcRelation extends HiveInspectors {
272272
def unwrapOrcStructs(
273273
conf: Configuration,
274274
dataSchema: StructType,
275+
requiredSchema: StructType,
275276
maybeStructOI: Option[StructObjectInspector],
276277
iterator: Iterator[Writable]): Iterator[InternalRow] = {
277278
val deserializer = new OrcSerde
278-
val mutableRow = new SpecificInternalRow(dataSchema.map(_.dataType))
279-
val unsafeProjection = UnsafeProjection.create(dataSchema)
279+
val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType))
280+
val unsafeProjection = UnsafeProjection.create(requiredSchema)
280281

281282
def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = {
282-
val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map {
283-
case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal
283+
val (fieldRefs, fieldOrdinals) = requiredSchema.zipWithIndex.map {
284+
case (field, ordinal) =>
285+
var ref = oi.getStructFieldRef(field.name)
286+
if (ref == null) {
287+
ref = oi.getStructFieldRef("_col" + dataSchema.fieldIndex(field.name))
288+
}
289+
ref -> ordinal
284290
}.unzip
285291

286-
val unwrappers = fieldRefs.map(unwrapperFor)
292+
val unwrappers = fieldRefs.map(r => if (r == null) null else unwrapperFor(r))
287293

288294
iterator.map { value =>
289295
val raw = deserializer.deserialize(value)
290296
var i = 0
291297
val length = fieldRefs.length
292298
while (i < length) {
293-
val fieldValue = oi.getStructFieldData(raw, fieldRefs(i))
299+
val fieldRef = fieldRefs(i)
300+
val fieldValue = if (fieldRef == null) null else oi.getStructFieldData(raw, fieldRef)
294301
if (fieldValue == null) {
295302
mutableRow.setNullAt(fieldOrdinals(i))
296303
} else {
@@ -306,8 +313,8 @@ private[orc] object OrcRelation extends HiveInspectors {
306313
}
307314

308315
def setRequiredColumns(
309-
conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = {
310-
val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer)
316+
conf: Configuration, dataSchema: StructType, requestedSchema: StructType): Unit = {
317+
val ids = requestedSchema.map(a => dataSchema.fieldIndex(a.name): Integer)
311318
val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip
312319
HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
313320
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
3434
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
3535
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
3636
import org.apache.spark.sql.functions._
37-
import org.apache.spark.sql.hive.HiveUtils
37+
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
3838
import org.apache.spark.sql.hive.test.TestHiveSingleton
3939
import org.apache.spark.sql.internal.SQLConf
4040
import org.apache.spark.sql.test.SQLTestUtils
@@ -2050,4 +2050,64 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
20502050
}
20512051
}
20522052
}
2053+
2054+
Seq("orc", "parquet").foreach { format =>
2055+
test(s"SPARK-18355 Read data from a hive table with a new column - $format") {
2056+
val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
2057+
2058+
Seq("true", "false").foreach { value =>
2059+
withSQLConf(
2060+
HiveUtils.CONVERT_METASTORE_ORC.key -> value,
2061+
HiveUtils.CONVERT_METASTORE_PARQUET.key -> value) {
2062+
withTempDatabase { db =>
2063+
client.runSqlHive(
2064+
s"""
2065+
|CREATE TABLE $db.t(
2066+
| click_id string,
2067+
| search_id string,
2068+
| uid bigint)
2069+
|PARTITIONED BY (
2070+
| ts string,
2071+
| hour string)
2072+
|STORED AS $format
2073+
""".stripMargin)
2074+
2075+
client.runSqlHive(
2076+
s"""
2077+
|INSERT INTO TABLE $db.t
2078+
|PARTITION (ts = '98765', hour = '01')
2079+
|VALUES (12, 2, 12345)
2080+
""".stripMargin
2081+
)
2082+
2083+
checkAnswer(
2084+
sql(s"SELECT click_id, search_id, uid, ts, hour FROM $db.t"),
2085+
Row("12", "2", 12345, "98765", "01"))
2086+
2087+
client.runSqlHive(s"ALTER TABLE $db.t ADD COLUMNS (dummy string)")
2088+
2089+
checkAnswer(
2090+
sql(s"SELECT click_id, search_id FROM $db.t"),
2091+
Row("12", "2"))
2092+
2093+
checkAnswer(
2094+
sql(s"SELECT search_id, click_id FROM $db.t"),
2095+
Row("2", "12"))
2096+
2097+
checkAnswer(
2098+
sql(s"SELECT search_id FROM $db.t"),
2099+
Row("2"))
2100+
2101+
checkAnswer(
2102+
sql(s"SELECT dummy, click_id FROM $db.t"),
2103+
Row(null, "12"))
2104+
2105+
checkAnswer(
2106+
sql(s"SELECT click_id, search_id, uid, dummy, ts, hour FROM $db.t"),
2107+
Row("12", "2", 12345, null, "98765", "01"))
2108+
}
2109+
}
2110+
}
2111+
}
2112+
}
20532113
}

0 commit comments

Comments
 (0)