Skip to content

[SPARK-4769] [SQL] CTAS does not work when reading from temporary tables #3336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,37 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
* For example, because of a CREATE TABLE X AS statement.
*/
object CreateTables extends Rule[LogicalPlan] {
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p

case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =>
case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)

CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra)
// Get the CreateTableDesc from Hive SemanticAnalyzer
val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) {
None
} else {
val sa = new SemanticAnalyzer(hive.hiveconf) {
override def analyzeInternal(ast: ASTNode) {
// A hack to intercept the SemanticAnalyzer.analyzeInternal,
// to ignore the SELECT clause of the CTAS
val method = classOf[SemanticAnalyzer].getDeclaredMethod(
"analyzeCreateTable", classOf[ASTNode], classOf[QB])
method.setAccessible(true)
method.invoke(this, ast, this.getQB)
}
}

sa.analyze(extra, new Context(hive.hiveconf))
Some(sa.getQB().getTableDesc)
}

CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import org.apache.hadoop.hive.ql.parse.ASTNode
import org.apache.hadoop.hive.ql.plan.CreateTableDesc

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
Expand Down Expand Up @@ -181,13 +182,20 @@ private[hive] trait HiveStrategies {
execution.InsertIntoHiveTable(
table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.CreateTableAsSelect(
Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) =>
CreateTableAsSelect(
Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) =>
execution.CreateTableAsSelect(
database,
tableName,
child,
allowExisting,
extra) :: Nil
Some(desc)) :: Nil
case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) =>
execution.CreateTableAsSelect(
database,
tableName,
child,
allowExisting,
None) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.sql.hive.execution

import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode}
import org.apache.hadoop.hive.ql.plan.CreateTableDesc

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
Expand All @@ -35,8 +35,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
* @param query the query whose result will be insert into the new relation
* @param allowExisting allow continue working if it's already exists, otherwise
* raise exception
* @param extra the extra information for this Operator, it should be the
* ASTNode object for extracting the CreateTableDesc.
* @param desc the CreateTableDesc, which may contains serde, storage handler etc.

*/
@Experimental
Expand All @@ -45,21 +44,16 @@ case class CreateTableAsSelect(
tableName: String,
query: LogicalPlan,
allowExisting: Boolean,
extra: ASTNode) extends LeafNode with Command {
desc: Option[CreateTableDesc]) extends LeafNode with Command {

def output = Seq.empty

private[this] def sc = sqlContext.asInstanceOf[HiveContext]

// A lazy computing of the metastoreRelation
private[this] lazy val metastoreRelation: MetastoreRelation = {
// Get the CreateTableDesc from Hive SemanticAnalyzer
val sa = new SemanticAnalyzer(sc.hiveconf)

sa.analyze(extra, new Context(sc.hiveconf))
val desc = sa.getQB().getTableDesc
// Create Hive Table
sc.catalog.createTable(database, tableName, query.output, allowExisting, Some(desc))
sc.catalog.createTable(database, tableName, query.output, allowExisting, desc)

// Get the Metastore Relation
sc.catalog.lookupRelation(Some(database), tableName, None) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ class SQLQuerySuite extends QueryTest {
checkAnswer(
sql("SELECT f1.f2.f3 FROM nested"),
1)
checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"),
Seq.empty[Row])
checkAnswer(
sql("SELECT * FROM test_ctas_1234"),
sql("SELECT * FROM nested").collect().toSeq)

intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
sql("CREATE TABLE test_ctas_12345 AS SELECT * from notexists").collect()
}
}

test("test CTAS") {
Expand Down