Skip to content

[SPARK-16628][SQL] Translate file-based relation schema when file schema is inconsistent with catalog schema #14365

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
import org.apache.spark.sql.hive.{HiveInspectors, HiveSessionState, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -126,6 +126,9 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

val convertHiveOrc =
sparkSession.sessionState.asInstanceOf[HiveSessionState].convertMetastoreOrc

(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value

Expand All @@ -137,7 +140,31 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
Iterator.empty
} else {
val physicalSchema = maybePhysicalSchema.get
OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)

// SPARK-16628: an Orc table created by Hive may not store column name correctly in the
// Orc files. So the physical schema can mismatch required schema which comes from
// metastore schema and reading Orc files will fail.
// To fix this, we assume the metastore schema `dataSchema` can match to `physicalSchema`
// by each column disregarding the column names. If not, we throw an exception that
// suggests users to disable the conversion of Hive Orc tables.
val physicalRequiredSchema =
if (convertHiveOrc && OrcRelation.isMismatchSchema(physicalSchema, requiredSchema)) {
require(physicalSchema.length == dataSchema.length,
s"physical schema $physicalSchema in Orc file doesn't match metastore schema " +
s"$dataSchema, please disable spark.sql.hive.convertMetastoreOrc to work around " +
"this problem.")
physicalSchema.fields.zip(dataSchema.fields).map { case (pf, df) =>
require(pf.dataType == df.dataType,
s"Column $pf in Orc file doesn't match column $df in metastore schema, " +
"please disable spark.sql.hive.convertMetastoreOrc to work around this problem.")
}
OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema)
StructType(
requiredSchema.map(a => dataSchema.fieldIndex(a.name)).map(physicalSchema(_)))
} else {
OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
requiredSchema
}

val orcRecordReader = {
val job = Job.getInstance(conf)
Expand All @@ -161,7 +188,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
// Unwraps `OrcStruct`s to `UnsafeRow`s
OrcRelation.unwrapOrcStructs(
conf,
requiredSchema,
physicalRequiredSchema,
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
recordsIterator)
}
Expand Down Expand Up @@ -303,6 +330,10 @@ private[orc] object OrcRelation extends HiveInspectors {
maybeStructOI.map(unwrap).getOrElse(Iterator.empty)
}

def isMismatchSchema(physicalSchema: StructType, requestedSchema: StructType): Boolean = {
requestedSchema.forall(a => physicalSchema.getFieldIndex(a.name).isEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, the following?

!requestedSchema.forall(a => physicalSchema.getFieldIndex(a.name).isDefined)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, never mind. It's really about checking whether all requested columns are not matched.

}

def setRequiredColumns(
conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = {
val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.io.orc.{OrcStruct, SparkOrcNewRecordReader}
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
Expand Down Expand Up @@ -590,6 +591,79 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}

test("ORC conversion when metastore schema does not match schema stored in ORC files") {
withTempView("single") {
val singleRowDF = Seq((0, "foo")).toDF("key", "value")
singleRowDF.createOrReplaceTempView("single")

withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") {
withTable("dummy_orc", "dummy_orc2", "dummy_orc3") {
withTempPath { dir =>
val path = dir.getCanonicalPath

// Create a Metastore ORC table and insert data into it.
spark.sql(
s"""
|CREATE TABLE dummy_orc(value STRING)
|PARTITIONED BY (key INT)
|STORED AS ORC
|LOCATION '$path'
""".stripMargin)

spark.sql(
s"""
|INSERT INTO TABLE dummy_orc
|PARTITION(key=0)
|SELECT value FROM single
""".stripMargin)

val df = spark.sql("SELECT key, value FROM dummy_orc WHERE key=0")
checkAnswer(df, singleRowDF)

// Create a Metastore ORC table with the schema of different column names.
spark.sql(
s"""
|CREATE EXTERNAL TABLE dummy_orc2(value2 STRING)
|PARTITIONED BY (key INT)
|STORED AS ORC
|LOCATION '$path'
""".stripMargin)

spark.sql("ALTER TABLE dummy_orc2 ADD PARTITION(key=0)")

// The output of the relation is the schema from the Metastore, not from the orc file.
val df2 = spark.sql("SELECT key, value2 FROM dummy_orc2 WHERE key=0 AND value2='foo'")
checkAnswer(df2, singleRowDF)

val queryExecution = df2.queryExecution
queryExecution.analyzed.collectFirst {
case _: LogicalRelation => ()
}.getOrElse {
fail(s"Expecting the query plan to convert orc to data sources, " +
s"but got:\n$queryExecution")
}

// When the column types of Orc files are not matching with metastore schema,
// we can't convert Hive metastore Orc table to datasource table.
spark.sql(
s"""
|CREATE EXTERNAL TABLE dummy_orc3(value2 INT)
|PARTITIONED BY (key INT)
|STORED AS ORC
|LOCATION '$path'
""".stripMargin)

spark.sql("ALTER TABLE dummy_orc3 ADD PARTITION(key=0)")
val errorMessage = intercept[SparkException] {
spark.sql("SELECT key, value2 FROM dummy_orc3 WHERE key=0 AND value2=1").count()
}.getMessage
assert(errorMessage.contains("please disable spark.sql.hive.convertMetastoreOrc"))
}
}
}
}
}

test("Empty schema does not read data from ORC file") {
val data = Seq((1, 1), (2, 2))
withOrcFile(data) { path =>
Expand Down