Skip to content

Commit b0319c2

Browse files
committed
[SPARK-19107][SQL] support creating hive table with DataFrameWriter and Catalog
## What changes were proposed in this pull request? After unifying the CREATE TABLE syntax in #16296, it's pretty easy to support creating hive table with `DataFrameWriter` and `Catalog` now. This PR basically just removes the hive provider check in `DataFrameWriter.saveAsTable` and `Catalog.createExternalTable`, and add tests. ## How was this patch tested? new tests in `HiveDDLSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #16487 from cloud-fan/hive-table.
1 parent b0e5840 commit b0319c2

File tree

6 files changed

+93
-40
lines changed

6 files changed

+93
-40
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.InterfaceStability
2828
import org.apache.spark.rdd.RDD
2929
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
3030
import org.apache.spark.sql.execution.LogicalRDD
31+
import org.apache.spark.sql.execution.command.DDLUtils
3132
import org.apache.spark.sql.execution.datasources.DataSource
3233
import org.apache.spark.sql.execution.datasources.jdbc._
3334
import org.apache.spark.sql.execution.datasources.json.InferSchema
@@ -143,6 +144,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
143144
*/
144145
@scala.annotation.varargs
145146
def load(paths: String*): DataFrame = {
147+
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
148+
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
149+
"read files of Hive data source directly.")
150+
}
151+
146152
sparkSession.baseRelationToDataFrame(
147153
DataSource.apply(
148154
sparkSession,
@@ -160,7 +166,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
160166
*/
161167
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
162168
// properties should override settings in extraOptions.
163-
this.extraOptions = this.extraOptions ++ properties.asScala
169+
this.extraOptions ++= properties.asScala
164170
// explicit url and dbtable should override all
165171
this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
166172
format("jdbc").load()
@@ -469,9 +475,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
469475
* @since 1.4.0
470476
*/
471477
def table(tableName: String): DataFrame = {
472-
Dataset.ofRows(sparkSession,
473-
sparkSession.sessionState.catalog.lookupRelation(
474-
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)))
478+
sparkSession.table(tableName)
475479
}
476480

477481
/**
@@ -550,6 +554,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
550554

551555
private var userSpecifiedSchema: Option[StructType] = None
552556

553-
private var extraOptions = new scala.collection.mutable.HashMap[String, String]
557+
private val extraOptions = new scala.collection.mutable.HashMap[String, String]
554558

555559
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
205205
* @since 1.4.0
206206
*/
207207
def save(): Unit = {
208+
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
209+
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
210+
"write files of Hive data source directly.")
211+
}
212+
208213
assertNotBucketed("save")
209214
val dataSource = DataSource(
210215
df.sparkSession,
@@ -361,10 +366,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
361366
}
362367

363368
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
364-
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
365-
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
366-
}
367-
368369
val catalog = df.sparkSession.sessionState.catalog
369370
val tableExists = catalog.tableExists(tableIdent)
370371
val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
@@ -385,6 +386,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
385386
}
386387
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
387388
// Only do the check if the table is a data source table (the relation is a BaseRelation).
389+
// TODO(cloud-fan): also check hive table relation here when we support overwrite mode
390+
// for creating hive tables.
388391
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
389392
throw new AnalysisException(
390393
s"Cannot overwrite table $tableName that is also being read from")

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,6 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
347347
source: String,
348348
schema: StructType,
349349
options: Map[String, String]): DataFrame = {
350-
if (source.toLowerCase == "hive") {
351-
throw new AnalysisException("Cannot create hive serde table with createExternalTable API.")
352-
}
353-
354350
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
355351
val tableDesc = CatalogTable(
356352
identifier = tableIdent,

sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -322,13 +322,6 @@ class CatalogSuite
322322
assert(e2.message == "Cannot create a file-based external data source table without path")
323323
}
324324

325-
test("createExternalTable should fail if provider is hive") {
326-
val e = intercept[AnalysisException] {
327-
spark.catalog.createExternalTable("tbl", "HiVe", Map.empty[String, String])
328-
}
329-
assert(e.message.contains("Cannot create hive serde table with createExternalTable API"))
330-
}
331-
332325
test("dropTempView should not un-cache and drop metastore table if a same-name table exists") {
333326
withTable("same_name") {
334327
spark.range(10).write.saveAsTable("same_name")

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,26 +1169,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
11691169
}
11701170
}
11711171

1172-
test("save API - format hive") {
1173-
withTempDir { dir =>
1174-
val path = dir.getCanonicalPath
1175-
val e = intercept[ClassNotFoundException] {
1176-
spark.range(10).write.format("hive").mode(SaveMode.Ignore).save(path)
1177-
}.getMessage
1178-
assert(e.contains("Failed to find data source: hive"))
1179-
}
1180-
}
1181-
1182-
test("saveAsTable API - format hive") {
1183-
val tableName = "tab1"
1184-
withTable(tableName) {
1185-
val e = intercept[AnalysisException] {
1186-
spark.range(10).write.format("hive").mode(SaveMode.Overwrite).saveAsTable(tableName)
1187-
}.getMessage
1188-
assert(e.contains("Cannot create hive serde table with saveAsTable API"))
1189-
}
1190-
}
1191-
11921172
test("create a temp view using hive") {
11931173
val tableName = "tab1"
11941174
withTable (tableName) {

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
3333
import org.apache.spark.sql.internal.SQLConf
3434
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
3535
import org.apache.spark.sql.test.SQLTestUtils
36+
import org.apache.spark.sql.types.StructType
3637

3738
class HiveDDLSuite
3839
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
@@ -1289,4 +1290,80 @@ class HiveDDLSuite
12891290
}
12901291
}
12911292
}
1293+
1294+
test("create hive serde table with Catalog") {
1295+
withTable("t") {
1296+
withTempDir { dir =>
1297+
val df = spark.catalog.createExternalTable(
1298+
"t",
1299+
"hive",
1300+
new StructType().add("i", "int"),
1301+
Map("path" -> dir.getCanonicalPath, "fileFormat" -> "parquet"))
1302+
assert(df.collect().isEmpty)
1303+
1304+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1305+
assert(DDLUtils.isHiveTable(table))
1306+
assert(table.storage.inputFormat ==
1307+
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
1308+
assert(table.storage.outputFormat ==
1309+
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
1310+
assert(table.storage.serde ==
1311+
Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
1312+
1313+
sql("INSERT INTO t SELECT 1")
1314+
checkAnswer(spark.table("t"), Row(1))
1315+
}
1316+
}
1317+
}
1318+
1319+
test("create hive serde table with DataFrameWriter.saveAsTable") {
1320+
withTable("t", "t2") {
1321+
Seq(1 -> "a").toDF("i", "j")
1322+
.write.format("hive").option("fileFormat", "avro").saveAsTable("t")
1323+
checkAnswer(spark.table("t"), Row(1, "a"))
1324+
1325+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1326+
assert(DDLUtils.isHiveTable(table))
1327+
assert(table.storage.inputFormat ==
1328+
Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"))
1329+
assert(table.storage.outputFormat ==
1330+
Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
1331+
assert(table.storage.serde ==
1332+
Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
1333+
1334+
sql("INSERT INTO t SELECT 2, 'b'")
1335+
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
1336+
1337+
val e = intercept[AnalysisException] {
1338+
Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
1339+
}
1340+
assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " +
1341+
"to create a partitioned table using Hive"))
1342+
1343+
val e2 = intercept[AnalysisException] {
1344+
Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
1345+
}
1346+
assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))
1347+
1348+
val e3 = intercept[AnalysisException] {
1349+
spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t")
1350+
}
1351+
assert(e3.message.contains(
1352+
"CTAS for hive serde tables does not support append or overwrite semantics"))
1353+
}
1354+
}
1355+
1356+
test("read/write files with hive data source is not allowed") {
1357+
withTempDir { dir =>
1358+
val e = intercept[AnalysisException] {
1359+
spark.read.format("hive").load(dir.getAbsolutePath)
1360+
}
1361+
assert(e.message.contains("Hive data source can only be used with tables"))
1362+
1363+
val e2 = intercept[AnalysisException] {
1364+
Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath)
1365+
}
1366+
assert(e2.message.contains("Hive data source can only be used with tables"))
1367+
}
1368+
}
12921369
}

0 commit comments

Comments
 (0)