Skip to content

Commit 10326f0

Browse files
gengliangwangJackey Lee
authored andcommitted
[SPARK-26571][SQL] Update Hive Serde mapping with canonical name of Parquet and Orc FileFormat
## What changes were proposed in this pull request? Currently Spark table maintains Hive catalog storage format, so that Hive client can read it. In `HiveSerDe.scala`, Spark uses a mapping from its data source to HiveSerde. The mapping is old, we need to update with latest canonical name of Parquet and Orc FileFormat. Otherwise the following queries will result in wrong Serde value in Hive table(default value `org.apache.hadoop.mapred.SequenceFileInputFormat`), and Hive client will fail to read the output table: ``` df.write.format("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat").saveAsTable(..) ``` ``` df.write.format("org.apache.spark.sql.execution.datasources.orc.OrcFileFormat").saveAsTable(..) ``` This minor PR is to fix the mapping. ## How was this patch tested? Unit test. Closes apache#23491 from gengliangwang/fixHiveSerdeMap. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent e69b5b0 commit 10326f0

File tree

3 files changed

+20
-29
lines changed

3 files changed

+20
-29
lines changed

sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,10 @@ object HiveSerDe {
7474
def sourceToSerDe(source: String): Option[HiveSerDe] = {
7575
val key = source.toLowerCase(Locale.ROOT) match {
7676
case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
77+
case s if s.startsWith("org.apache.spark.sql.execution.datasources.parquet") => "parquet"
7778
case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
7879
case s if s.startsWith("org.apache.spark.sql.hive.orc") => "orc"
80+
case s if s.startsWith("org.apache.spark.sql.execution.datasources.orc") => "orc"
7981
case s if s.equals("orcfile") => "orc"
8082
case s if s.equals("parquetfile") => "parquet"
8183
case s if s.equals("avrofile") => "avro"

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,28 @@ class DataSourceWithHiveMetastoreCatalogSuite
159159
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
160160
)),
161161

162+
"org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat" -> ((
163+
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
164+
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
165+
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
166+
)),
167+
162168
"orc" -> ((
163169
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
164170
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
165171
"org.apache.hadoop.hive.ql.io.orc.OrcSerde"
172+
)),
173+
174+
"org.apache.spark.sql.hive.orc" -> ((
175+
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
176+
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
177+
"org.apache.hadoop.hive.ql.io.orc.OrcSerde"
178+
)),
179+
180+
"org.apache.spark.sql.execution.datasources.orc.OrcFileFormat" -> ((
181+
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
182+
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
183+
"org.apache.hadoop.hive.ql.io.orc.OrcSerde"
166184
))
167185
).foreach { case (provider, (inputFormat, outputFormat, serde)) =>
168186
test(s"Persist non-partitioned $provider relation into metastore as managed table") {

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@ import java.io.File
2121

2222
import org.apache.spark.sql.{AnalysisException, Row}
2323
import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT}
24-
import org.apache.spark.sql.catalyst.TableIdentifier
2524
import org.apache.spark.sql.execution.datasources.orc.OrcSuite
2625
import org.apache.spark.sql.hive.HiveUtils
2726
import org.apache.spark.sql.hive.test.TestHiveSingleton
28-
import org.apache.spark.sql.internal.HiveSerDe
2927
import org.apache.spark.sql.types._
3028
import org.apache.spark.util.Utils
3129

@@ -67,33 +65,6 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
6765
""".stripMargin)
6866
}
6967

70-
test("SPARK-22972: hive orc source") {
71-
val tableName = "normal_orc_as_source_hive"
72-
withTable(tableName) {
73-
sql(
74-
s"""
75-
|CREATE TABLE $tableName
76-
|USING org.apache.spark.sql.hive.orc
77-
|OPTIONS (
78-
| PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
79-
|)
80-
""".stripMargin)
81-
82-
val tableMetadata = spark.sessionState.catalog.getTableMetadata(
83-
TableIdentifier(tableName))
84-
assert(tableMetadata.storage.inputFormat ==
85-
Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
86-
assert(tableMetadata.storage.outputFormat ==
87-
Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
88-
assert(tableMetadata.storage.serde ==
89-
Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
90-
assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc")
91-
.equals(HiveSerDe.sourceToSerDe("orc")))
92-
assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.orc")
93-
.equals(HiveSerDe.sourceToSerDe("orc")))
94-
}
95-
}
96-
9768
test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
9869
val location = Utils.createTempDir()
9970
val uri = location.toURI

0 commit comments

Comments
 (0)