Skip to content

Commit 01a8e46

Browse files
gatorsmilesameeragarwal
authored andcommitted
[SPARK-21769][SQL] Add a table-specific option for always respecting schemas inferred/controlled by Spark SQL
## What changes were proposed in this pull request? For Hive-serde tables, we always respect the schema stored in Hive metastore, because the schema could be altered by the other engines that share the same metastore. Thus, we always trust the metastore-controlled schema for Hive-serde tables when the schemas are different (without considering the nullability and cases). However, in some scenarios, Hive metastore also could INCORRECTLY overwrite the schemas when the serde and Hive metastore built-in serde are different. The proposed solution is to introduce a table-specific option for such scenarios. For a specific table, users can make Spark always respect Spark-inferred/controlled schema instead of trusting metastore-controlled schema. By default, we trust Hive metastore-controlled schema. ## How was this patch tested? Added a cross-version test case Author: gatorsmile <gatorsmile@gmail.com> Closes #19003 from gatorsmile/respectSparkSchema.
1 parent 43d71d9 commit 01a8e46

File tree

4 files changed

+97
-5
lines changed

4 files changed

+97
-5
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
21+
22+
/**
23+
* Options for the data source.
24+
*/
25+
class SourceOptions(
26+
@transient private val parameters: CaseInsensitiveMap[String])
27+
extends Serializable {
28+
import SourceOptions._
29+
30+
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
31+
32+
// A flag to disable saving a data source table's metadata in hive compatible way.
33+
val skipHiveMetadata: Boolean = parameters
34+
.get(SKIP_HIVE_METADATA).map(_.toBoolean).getOrElse(DEFAULT_SKIP_HIVE_METADATA)
35+
36+
// A flag to always respect the Spark schema restored from the table properties
37+
val respectSparkSchema: Boolean = parameters
38+
.get(RESPECT_SPARK_SCHEMA).map(_.toBoolean).getOrElse(DEFAULT_RESPECT_SPARK_SCHEMA)
39+
}
40+
41+
42+
object SourceOptions {
43+
44+
val SKIP_HIVE_METADATA = "skipHiveMetadata"
45+
val DEFAULT_SKIP_HIVE_METADATA = false
46+
47+
val RESPECT_SPARK_SCHEMA = "respectSparkSchema"
48+
val DEFAULT_RESPECT_SPARK_SCHEMA = false
49+
50+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._
4141
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
4242
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
4343
import org.apache.spark.sql.execution.command.DDLUtils
44-
import org.apache.spark.sql.execution.datasources.PartitioningUtils
44+
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
4545
import org.apache.spark.sql.hive.client.HiveClient
4646
import org.apache.spark.sql.internal.HiveSerDe
4747
import org.apache.spark.sql.internal.StaticSQLConf._
@@ -260,6 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
260260
private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
261261
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
262262
val provider = table.provider.get
263+
val options = new SourceOptions(table.storage.properties)
263264

264265
// To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
265266
// support, no column nullability, etc., we should do some extra works before saving table
@@ -325,11 +326,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
325326

326327
val qualifiedTableName = table.identifier.quotedString
327328
val maybeSerde = HiveSerDe.sourceToSerDe(provider)
328-
val skipHiveMetadata = table.storage.properties
329-
.getOrElse("skipHiveMetadata", "false").toBoolean
330329

331330
val (hiveCompatibleTable, logMessage) = maybeSerde match {
332-
case _ if skipHiveMetadata =>
331+
case _ if options.skipHiveMetadata =>
333332
val message =
334333
s"Persisting data source table $qualifiedTableName into Hive metastore in" +
335334
"Spark SQL specific format, which is NOT compatible with Hive."
@@ -737,6 +736,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
737736
}
738737

739738
private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
739+
val options = new SourceOptions(table.storage.properties)
740740
val hiveTable = table.copy(
741741
provider = Some(DDLUtils.HIVE_PROVIDER),
742742
tracksPartitionsInCatalog = true)
@@ -748,7 +748,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
748748
val partColumnNames = getPartitionColumnsFromTableProperties(table)
749749
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
750750

751-
if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) {
751+
if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema) ||
752+
options.respectSparkSchema) {
752753
hiveTable.copy(
753754
schema = reorderedSchema,
754755
partitionColumnNames = partColumnNames,
Binary file not shown.

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,47 @@ class VersionsSuite extends SparkFunSuite with Logging {
763763
}
764764
}
765765

766+
test(s"$version: read avro file containing decimal") {
767+
val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
768+
val location = new File(url.getFile)
769+
770+
val tableName = "tab1"
771+
val avroSchema =
772+
"""{
773+
| "name": "test_record",
774+
| "type": "record",
775+
| "fields": [ {
776+
| "name": "f0",
777+
| "type": [
778+
| "null",
779+
| {
780+
| "precision": 38,
781+
| "scale": 2,
782+
| "type": "bytes",
783+
| "logicalType": "decimal"
784+
| }
785+
| ]
786+
| } ]
787+
|}
788+
""".stripMargin
789+
withTable(tableName) {
790+
versionSpark.sql(
791+
s"""
792+
|CREATE TABLE $tableName
793+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
794+
|WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
795+
|STORED AS
796+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
797+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
798+
|LOCATION '$location'
799+
|TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
800+
""".stripMargin
801+
)
802+
assert(versionSpark.table(tableName).collect() ===
803+
versionSpark.sql("SELECT 1.30").collect())
804+
}
805+
}
806+
766807
// TODO: add more tests.
767808
}
768809
}

0 commit comments

Comments
 (0)