Skip to content

[SPARK-16948][SQL] Use metastore schema instead of inferring schema for ORC in HiveMetastoreCatalog #14537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5721b88
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 8, 2016
4ae92d8
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 9, 2016
75bca1b
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 9, 2016
97d21f6
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
rbalamohan Aug 22, 2016
4004c0a
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 22, 2016
9a8838a
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 23, 2016
eb8a955
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 23, 2016
70cf84d
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 24, 2016
c9d677b
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 24, 2016
7385a06
Revert "[SPARK-16948][SQL] Querying empty partitioned orc tables thro…
rbalamohan Aug 24, 2016
1746853
Revert "Revert "[SPARK-16948][SQL] Querying empty partitioned orc tab…
rbalamohan Aug 24, 2016
5f7e5ad
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 24, 2016
e8e2d70
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 25, 2016
0f901aa
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 25, 2016
0772a96
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 25, 2016
6ff7e5d
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 25, 2016
fc14e2d
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 25, 2016
9ecb2ed
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Aug 26, 2016
fa71370
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Sep 22, 2016
e39715e
[SPARK-16948][SQL] Querying empty partitioned orc tables throws excep…
rbalamohan Sep 22, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
defaultSource: FileFormat,
fileFormatClass: Class[_ <: FileFormat],
fileType: String): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val metastoreSchema = metastoreRelation.schema
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.
Expand Down Expand Up @@ -237,21 +237,24 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
new Path(metastoreRelation.catalogTable.storage.locationUri.get),
partitionSpec)

val inferredSchema = if (fileType.equals("parquet")) {
val inferredSchema =
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
inferredSchema.map { inferred =>
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
} else {
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get
val schema = fileType match {
case "parquet" =>
val inferredSchema =
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
inferredSchema.map { inferred =>
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried here. If the table is partitioned, metastoreSchema will always contain partition columns, and thus the merged schema will contain partition columns too. This means, we always read parquet files with partition columns, I think we may have a hidden bug.

}.getOrElse(metastoreSchema)
case "orc" =>
metastoreSchema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the code path again, seems we must infer the schema here.

In metastore, we store the table schema and partition columns. HadoopFsRelation need a dataSchema which is the real schema of the data files. Normally it's just the table schema exclude partition columns, however, Spark SQL supports a special case: partition columns can also exist in data files. (see the doc for HadoopFsRelation.dataSchema). This information is not preserved in metastore, so we have to infer the data schema based on data files here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan As we discussed offline yesterday, this is probably fine since ORC supports column pruning. Therefore, when reading an ORC file in a partitioned table, the reader always ignores partition columns stored inside the physical file and uses the value encoded in partition directory path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a test case for this case here.

case _ =>
throw new RuntimeException(s"Cannot convert a $fileType to a data source table")
}

val relation = HadoopFsRelation(
sparkSession = sparkSession,
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = inferredSchema,
dataSchema = schema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
options = options)
Expand Down Expand Up @@ -281,7 +284,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
DataSource(
sparkSession = sparkSession,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
userSpecifiedSchema = Some(metastoreSchema),
bucketSpec = bucketSpec,
options = options,
className = fileType).resolveRelation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive.orc

import java.io.File
import java.nio.charset.StandardCharsets

import org.scalatest.BeforeAndAfterAll
Expand Down Expand Up @@ -372,6 +373,40 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}

test("support empty orc table when converting hive serde table to data source table") {
withSQLConf((HiveUtils.CONVERT_METASTORE_ORC.key, "true")) {
withTable("empty_orc_partitioned") {
sql(
"""
|CREATE TABLE empty_orc_partitioned(key INT, value STRING)
|PARTITIONED BY (p INT) STORED AS ORC
""".stripMargin)

val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really need .coalesce(1) since the create DataFrame wraps a LocalRelation.


// Query empty table
checkAnswer(
sql("SELECT key, value FROM empty_orc_partitioned"),
emptyDF)
}

withTable("empty_orc") {
sql(
"""
|CREATE TABLE empty_orc(key INT, value STRING)
|STORED AS ORC
""".stripMargin)

val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)

// Query empty table
checkAnswer(
sql("SELECT key, value FROM empty_orc"),
emptyDF)
}
}
}

test("SPARK-10623 Enable ORC PPD") {
withTempPath { dir =>
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
Expand Down