Skip to content

[SPARK-19152][SQL]DataFrameWriter.saveAsTable support hive append #16552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.types._

case class CreateTable(
Expand Down Expand Up @@ -65,6 +65,11 @@ case class CreateTempViewUsing(
}

def run(sparkSession: SparkSession): Seq[Row] = {
if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, " +
"you can't use it with CREATE TEMP VIEW USING")
}

val dataSource = DataSource(
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
// will catch it and return the original plan, so that the analyzer can report table not
// found later.
val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
if (!isFileFormat) {
if (!isFileFormat || dataSource.className.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Unsupported data source type for direct query on files: " +
s"${u.tableIdentifier.database.get}")
}
Expand Down Expand Up @@ -112,11 +112,6 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
throw new AnalysisException("Saving data into a view is not allowed.")
}

if (DDLUtils.isHiveTable(existingTable)) {
throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
"not supported yet. Please use the insertInto() API as an alternative.")
}

// Check if the specified data source match the data source of the existing table.
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
org.apache.spark.sql.hive.orc.OrcFileFormat
org.apache.spark.sql.hive.execution.HiveFileFormat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this auto-generated?

Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,11 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)

case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
// Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde
// tables yet.
if (mode == SaveMode.Append) {
throw new AnalysisException(
"CTAS for hive serde tables does not support append semantics.")
}

val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
CreateHiveTableAsSelectCommand(
tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
query,
mode == SaveMode.Ignore)
mode)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution

import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.command.RunnableCommand
Expand All @@ -31,13 +31,12 @@ import org.apache.spark.sql.hive.MetastoreRelation
*
* @param tableDesc the Table Describe, which may contains serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param ignoreIfExists allow continue working if it's already exists, otherwise
* raise exception
* @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
ignoreIfExists: Boolean)
mode: SaveMode)
extends RunnableCommand {

private val tableIdentifier = tableDesc.identifier
Expand Down Expand Up @@ -67,7 +66,7 @@ case class CreateHiveTableAsSelectCommand(
withFormat
}

sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we don't need to build withSchema anymore, the schema will be set in AnalyzeCreateTable


// Get the Metastore Relation
sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
Expand All @@ -80,11 +79,18 @@ case class CreateHiveTableAsSelectCommand(
// add the relation into catalog, just in case of failure occurs while data
// processing.
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
if (ignoreIfExists) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")

if (mode == SaveMode.ErrorIfExists) {
throw new AnalysisException(s"$tableIdentifier already exists.")
}
if (mode == SaveMode.Ignore) {
// Since the table already exists and the save mode is Ignore, we will just return.
return Seq.empty
}
sparkSession.sessionState.executePlan(InsertIntoTable(
metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd
} else {
try {
sparkSession.sessionState.executePlan(InsertIntoTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableJobConf

Expand All @@ -43,7 +44,13 @@ import org.apache.spark.util.SerializableJobConf
*
* TODO: implement the read logic.
*/
class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat with Logging {
class HiveFileFormat(fileSinkConf: FileSinkDesc)
extends FileFormat with DataSourceRegister with Logging {

def this() = this(null)

override def shortName(): String = "hive"

override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,12 +419,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sql(s"CREATE TABLE $tableName STORED AS SEQUENCEFILE AS SELECT 1 AS key, 'abc' AS value")

val df = sql(s"SELECT key, value FROM $tableName")
val e = intercept[AnalysisException] {
df.write.mode(SaveMode.Append).saveAsTable(tableName)
}.getMessage
assert(e.contains("Saving data in the Hive serde table default.tab1 is not supported " +
"yet. Please use the insertInto() API as an alternative."))

df.write.insertInto(tableName)
checkAnswer(
sql(s"SELECT * FROM $tableName"),
Expand Down Expand Up @@ -1167,16 +1161,17 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv

test("create a temp view using hive") {
val tableName = "tab1"
withTable (tableName) {
val e = intercept[ClassNotFoundException] {
withTable(tableName) {
val e = intercept[AnalysisException] {
sql(
s"""
|CREATE TEMPORARY VIEW $tableName
|(col1 int)
|USING hive
""".stripMargin)
}.getMessage
assert(e.contains("Failed to find data source: hive"))
assert(e.contains("Hive data source can only be used with tables, you can't use it with " +
"CREATE TEMP VIEW USING"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ class HiveDDLSuite
}

test("create hive serde table with DataFrameWriter.saveAsTable") {
withTable("t", "t2") {
withTable("t", "t1") {
Seq(1 -> "a").toDF("i", "j")
.write.format("hive").option("fileFormat", "avro").saveAsTable("t")
checkAnswer(spark.table("t"), Row(1, "a"))
Expand Down Expand Up @@ -1350,11 +1350,8 @@ class HiveDDLSuite
assert(table.storage.serde ==
Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))

sql("INSERT INTO t SELECT 2, 'b'")
checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)

val e2 = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t1")
}
assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))

Expand All @@ -1365,6 +1362,35 @@ class HiveDDLSuite
}
}

test("append data to hive serde table") {
withTable("t", "t1") {
Seq(1 -> "a").toDF("i", "j")
.write.format("hive").option("fileFormat", "avro").saveAsTable("t")
checkAnswer(spark.table("t"), Row(1, "a"))

sql("INSERT INTO t SELECT 2, 'b'")
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)

Seq(3 -> "c").toDF("i", "j")
.write.format("hive").mode("append").saveAsTable("t")
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)

Seq("c" -> 3).toDF("i", "j")
.write.format("hive").mode("append").saveAsTable("t")
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c")
:: Row(null, "3") :: Nil)

Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")

val e = intercept[AnalysisException] {
Seq(5 -> "e").toDF("i", "j")
.write.format("hive").mode("append").saveAsTable("t1")
}
assert(e.message.contains("The format of the existing table default.t1 is " +
"`ParquetFileFormat`. It doesn't match the specified format `HiveFileFormat`."))
}
}

test("create partitioned hive serde table as select") {
withTable("t", "t1") {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,23 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
})
}

test("run sql directly on files - hive") {
withTempPath(f => {
spark.range(100).toDF.write.parquet(f.getCanonicalPath)

var e = intercept[AnalysisException] {
sql(s"select id from hive.`${f.getCanonicalPath}`")
}
assert(e.message.contains("Unsupported data source type for direct query on files: hive"))

// data source type is case insensitive
e = intercept[AnalysisException] {
sql(s"select id from HIVE.`${f.getCanonicalPath}`")
}
assert(e.message.contains("Unsupported data source type for direct query on files: HIVE"))
})
}

test("SPARK-8976 Wrong Result for Rollup #1") {
checkAnswer(sql(
"SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),
Expand Down