Skip to content

Commit db00cf9

Browse files
committed
simplify CreateHiveTableAsSelectCommand
1 parent 752502b commit db00cf9

File tree

4 files changed

+49
-42
lines changed

4 files changed

+49
-42
lines changed

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
2222
import org.apache.spark.annotation.{Experimental, InterfaceStability}
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
25+
import org.apache.spark.sql.execution.command.DDLUtils
2526
import org.apache.spark.sql.execution.datasources.DataSource
2627
import org.apache.spark.sql.execution.streaming.StreamingRelation
2728
import org.apache.spark.sql.types.StructType
@@ -116,6 +117,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
116117
* @since 2.0.0
117118
*/
118119
def load(): DataFrame = {
120+
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
121+
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
122+
"write files of Hive data source directly.")
123+
}
124+
119125
val dataSource =
120126
DataSource(
121127
sparkSession,

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
2222
import org.apache.spark.annotation.{Experimental, InterfaceStability}
2323
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter}
2424
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
25+
import org.apache.spark.sql.execution.command.DDLUtils
2526
import org.apache.spark.sql.execution.datasources.DataSource
2627
import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink}
2728

@@ -221,6 +222,11 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
221222
* @since 2.0.0
222223
*/
223224
def start(): StreamingQuery = {
225+
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
226+
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
227+
"read files of Hive data source directly.")
228+
}
229+
224230
if (source == "memory") {
225231
assertNotPartitioned("memory")
226232
if (extraOptions.get("queryName").isEmpty) {

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution
2020
import scala.util.control.NonFatal
2121

2222
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
23-
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
24-
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
23+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
24+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
25+
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
2526
import org.apache.spark.sql.execution.command.RunnableCommand
26-
import org.apache.spark.sql.hive.MetastoreRelation
2727

2828

2929
/**
@@ -44,40 +44,6 @@ case class CreateHiveTableAsSelectCommand(
4444
override def innerChildren: Seq[LogicalPlan] = Seq(query)
4545

4646
override def run(sparkSession: SparkSession): Seq[Row] = {
47-
lazy val metastoreRelation: MetastoreRelation = {
48-
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
49-
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
50-
import org.apache.hadoop.io.Text
51-
import org.apache.hadoop.mapred.TextInputFormat
52-
53-
val withFormat =
54-
tableDesc.withNewStorage(
55-
inputFormat =
56-
tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
57-
outputFormat =
58-
tableDesc.storage.outputFormat
59-
.orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
60-
serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)),
61-
compressed = tableDesc.storage.compressed)
62-
63-
val withSchema = if (withFormat.schema.isEmpty) {
64-
tableDesc.copy(schema = query.schema)
65-
} else {
66-
withFormat
67-
}
68-
69-
sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true)
70-
71-
// Get the Metastore Relation
72-
sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
73-
case SubqueryAlias(_, r: SimpleCatalogRelation, _) =>
74-
val tableMeta = r.metadata
75-
MetastoreRelation(tableMeta.database, tableMeta.identifier.table)(tableMeta, sparkSession)
76-
}
77-
}
78-
// TODO ideally, we should get the output data ready first and then
79-
// add the relation into catalog, just in case of failure occurs while data
80-
// processing.
8147
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
8248
assert(mode != SaveMode.Overwrite,
8349
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
@@ -89,12 +55,30 @@ case class CreateHiveTableAsSelectCommand(
8955
// Since the table already exists and the save mode is Ignore, we will just return.
9056
return Seq.empty
9157
}
92-
sparkSession.sessionState.executePlan(InsertIntoTable(
93-
metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd
58+
59+
sparkSession.sessionState.executePlan(
60+
InsertIntoTable(
61+
UnresolvedRelation(tableIdentifier),
62+
Map(),
63+
query,
64+
overwrite = false,
65+
ifNotExists = false)).toRdd
9466
} else {
67+
// TODO ideally, we should get the output data ready first and then
68+
// add the relation into catalog, just in case of failure occurs while data
69+
// processing.
70+
assert(tableDesc.schema.isEmpty)
71+
sparkSession.sessionState.catalog.createTable(
72+
tableDesc.copy(schema = query.schema), ignoreIfExists = false)
73+
9574
try {
96-
sparkSession.sessionState.executePlan(InsertIntoTable(
97-
metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
75+
sparkSession.sessionState.executePlan(
76+
InsertIntoTable(
77+
UnresolvedRelation(tableIdentifier),
78+
Map(),
79+
query,
80+
overwrite = true,
81+
ifNotExists = false)).toRdd
9882
} catch {
9983
case NonFatal(e) =>
10084
// drop the created table.

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
3838

3939
class HiveDDLSuite
4040
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
41-
import spark.implicits._
41+
import testImplicits._
4242

4343
override def afterEach(): Unit = {
4444
try {
@@ -1425,6 +1425,17 @@ class HiveDDLSuite
14251425
Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath)
14261426
}
14271427
assert(e2.message.contains("Hive data source can only be used with tables"))
1428+
1429+
val e3 = intercept[AnalysisException] {
1430+
spark.readStream.format("hive").load(dir.getAbsolutePath)
1431+
}
1432+
assert(e3.message.contains("Hive data source can only be used with tables"))
1433+
1434+
val e4 = intercept[AnalysisException] {
1435+
spark.readStream.schema(new StructType()).parquet(dir.getAbsolutePath)
1436+
.writeStream.format("hive").start(dir.getAbsolutePath)
1437+
}
1438+
assert(e4.message.contains("Hive data source can only be used with tables"))
14281439
}
14291440
}
14301441

0 commit comments

Comments
 (0)