Skip to content

[SPARK-6322][SQL] CTAS should consider the case where no file format or storage handler is given #5014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,13 @@ case class InsertIntoTable(
}
}

case class CreateTableAsSelect[T](
databaseName: Option[String],
tableName: String,
child: LogicalPlan,
allowExisting: Boolean,
desc: Option[T] = None) extends UnaryNode {
abstract class CreateTableAsSelect extends UnaryCommand {
self: Product =>
def databaseName: Option[String]
def tableName: String
def child: LogicalPlan
def allowExisting: Boolean

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having this trait here, can we just make the implementation below a Command?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we cannot use Command because a Command is a LeafNode. If we make it a Command, the child logical plan will not be analyzed and optimized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think it will be good to have a different type of Command that is not a LeafNode.

override def output: Seq[Attribute] = Seq.empty[Attribute]
override lazy val resolved: Boolean = databaseName != None && childrenResolved
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,11 @@ abstract class Command extends LeafNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty
}

/**
* An UnaryNode that represents a non-query command to be executed by the system.
*/
abstract class UnaryCommand extends UnaryNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class DataFrame private[sql](
// happen right away to let these side effects take place eagerly.
case _: Command |
_: InsertIntoTable |
_: CreateTableAsSelect[_] |
_: CreateTableAsSelect |
_: CreateTableUsingAsSelect |
_: WriteToFile =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable}
import org.apache.hadoop.hive.metastore.{TableType, Warehouse}
import org.apache.hadoop.hive.ql.metadata._
import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
Expand Down Expand Up @@ -600,7 +601,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

// TODO extra is in type of ASTNode which means the logical plan is not resolved
// Need to think about how to implement the CreateTableAsSelect.resolved
case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) =>
case CreateHiveTableAsSelect(db, tableName, child, allowExisting, extra) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)

Expand All @@ -619,7 +620,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}

sa.analyze(extra, new Context(hive.hiveconf))
sa.analyze(extra.asInstanceOf[ASTNode], new Context(hive.hiveconf))
Some(sa.getQB().getTableDesc)
}

Expand Down Expand Up @@ -658,34 +659,6 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}

case p: LogicalPlan if p.resolved => p

case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
if (hive.convertCTAS) {
if (dbName.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
tblName,
hive.conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
} else {
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
None)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting this probably not right. Though HiveQL.scala always pass in a valid value, but this pattern matching required by the CREATE TABLE USING, which is not the HiveQL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is mostly duplicate with previous case. They are combined.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see your point, but it seems more reasonable to keep both CreateTableAsSelect pattern matchings, probably we can remove the duplicated code for the first matching block, what do you think?
Hmm, and hasStorageSpec seems not right to me in previous implementation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically they are almost the same codes except for checking file format or storage handler. So I think we don't need to keep two pattern matchings here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenghao-intel Why hasStorageSpec was not implemented right? Also CREATE TABLE USING will use CreateTableUsingAsSelect. Also, a little bit off topic, can you remind me why the desc in CreateTableAsSelect is an Option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateTableAsSelect was designed to support CTAS, not only for HiveQL, but also for the other SQL dialect. See: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala#L128, the type of desc is desc:Option[T]
In HiveQL case, the T should be ASTNode, and desc will never be None, we can always extract the SerDe information from it via Hive SemanticAnalyzer, even if no storage spec specified in the DDL SQL, as Hive SemanticAnalyzer will give the default SerDe for that case, So seems hasStorageSpec will be false only if the table already existed?

The change @viirya made seems reasonable for me here, the only concern is what if hive.convertCTAS is false and no storage spec specified in DDL SQL? How to extract the default serde (as the desc is None)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For data source table, it does not use CreateTableAsSlect because it has its own semantics like provider, temporary, and mode, and options. So, i think it is better to have its own logical plan instead of mixing these in CreateTableAsSlect.

For hasStorageSpec, the name can be better. The explanation of hive.convertCTAS can be found at https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala#L72.
It is mainly used to avoid saving data to text and sequence file. For now, Hive will not set serde when text file or sequence file is used, i.e. crtTbl.getSerName will be null (see fillStorageFormat and fillDefaultStorageFormat).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see. I'll agree with you to not mix the CreateTableAsSelect and the CreateTableUsingAsSelect. And probably we also need to refactor the CreateTableAsSelect, which should be used only within HiveContext, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Sounds good.

}

Expand Down Expand Up @@ -764,6 +737,14 @@ private[hive] case class InsertIntoHiveTable(
}
}

private[hive] case class CreateHiveTableAsSelect(
databaseName: Option[String],
tableName: String,
child: LogicalPlan,
allowExisting: Boolean,
desc: Node) extends CreateTableAsSelect {
}

private[hive] case class MetastoreRelation
(databaseName: String, tableName: String, alias: Option[String])
(val table: TTable, val partitions: Seq[TPartition])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
"TOK_TABLEPROPERTIES"),
children)
val (db, tableName) = extractDbNameTableName(tableNameParts)

CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, Some(node))
CreateHiveTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, node)

// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
case Token("TOK_CREATETABLE", _) => NativePlaceholder
Expand Down