Skip to content

Commit

Permalink
Implement conversion to v1 plans.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Mar 31, 2020
1 parent 0522c93 commit 12a17fc
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2411,7 +2411,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging

/**
* Type to keep track of table clauses:
* (partitioning, bucketSpec, properties, options, location, comment).
* (partTransforms, partCols, bucketSpec, properties, options, location, comment, serde).
*/
type TableClauses = (
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
Expand Down Expand Up @@ -2869,7 +2869,6 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
Expand Down Expand Up @@ -2952,9 +2951,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
*/
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ...", ctx)
}

val columns = Option(ctx.colTypeList()).map(visitColTypeList)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ case class SerdeInfo(
assert(storedAs.isEmpty || serde.isEmpty,
s"Conflicting STORED AS $storedAs and SERDE $serde values")

def describe: String = {
val serdeString = serde.map(sd => s" SERDE $sd").getOrElse("")
this match {
case SerdeInfo(Some(format), _, _, _) =>
s"STORED AS $format$serdeString"
case SerdeInfo(_, Some((inFormat, outFormat)), _, _) =>
s"INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat$serdeString"
case _ =>
serdeString;
}
}

def merge(other: SerdeInfo): SerdeInfo = {
def getOnly[T](desc: String, left: Option[T], right: Option[T]): Option[T] = {
(left, right) match {
Expand All @@ -90,6 +102,12 @@ case class SerdeInfo(
}
}

object SerdeInfo {
val empty: SerdeInfo = {
SerdeInfo(None, None, None, Map.empty)
}
}

/**
* A CREATE TABLE command, as parsed from SQL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}

/**
Expand Down Expand Up @@ -268,47 +268,39 @@ class ResolveSessionCatalog(
// session catalog and the table provider is not v2.
case c @ CreateTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
if (!DDLUtils.isHiveTable(Some(provider))) {
assertNoCharTypeInSchema(c.tableSchema)
}
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema,
c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location,
c.comment, c.ifNotExists)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, None)
} else {
assertNoCharTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c),
ignoreIfExists = c.ifNotExists)
buildV1Table(tbl.asTableIdentifier, c) match {
case Some(tableDesc) =>
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, None)

case None =>
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c),
ignoreIfExists = c.ifNotExists)
}

case c @ CreateTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location,
c.comment, c.ifNotExists)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, Some(c.asSelect))
} else {
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c),
writeOptions = c.options,
ignoreIfExists = c.ifNotExists)
buildV1Table(tbl.asTableIdentifier, c) match {
case Some(tableDesc) =>
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, Some(c.asSelect))

case None =>
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c),
writeOptions = c.options,
ignoreIfExists = c.ifNotExists)
}

// v1 REFRESH TABLE supports temp view.
Expand Down Expand Up @@ -633,6 +625,63 @@ class ResolveSessionCatalog(
case _ => throw new AnalysisException(s"$sql is only supported with temp views or v1 tables.")
}

private def buildV1Table(
ident: TableIdentifier,
c: CreateTableAsSelectStatement): Option[CatalogTable] = {
buildV1Table(
ident, new StructType, c.partitioning, c.bucketSpec, c.properties, c.provider, c.serde,
c.options, c.location, c.comment)
}

private def buildV1Table(
ident: TableIdentifier,
c: CreateTableStatement): Option[CatalogTable] = {
buildV1Table(
ident, c.tableSchema, c.partitioning, c.bucketSpec, c.properties, c.provider, c.serde,
c.options, c.location, c.comment)
}

private def buildV1Table(
table: TableIdentifier,
schema: StructType,
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: Option[String],
serdeInfo: Option[SerdeInfo],
options: Map[String, String],
location: Option[String],
comment: Option[String]): Option[CatalogTable] = {
(provider, serdeInfo) match {
case (Some(provider), Some(serde)) =>
throw new AnalysisException(
s"Cannot create table with both USING $provider and ${serde.describe}")

case (None, Some(serde)) =>
Some(buildHiveCatalogTable(
table, schema, partitioning, bucketSpec, properties, serde, options, location,
comment))

case (None, None) if conf.createHiveTableByDefaultEnabled =>
Some(buildHiveCatalogTable(
table, schema, partitioning, bucketSpec, properties, SerdeInfo.empty, options, location,
comment))

case (Some(provider), None) if !isV2Provider(provider) =>
Some(buildCatalogTable(
table, schema, partitioning, bucketSpec, properties, provider, options, location,
comment))

case (None, None) if !isV2Provider(conf.defaultDataSourceName) =>
Some(buildCatalogTable(
table, schema, partitioning, bucketSpec, properties, conf.defaultDataSourceName, options,
location, comment))

case _ =>
None
}
}

private def buildCatalogTable(
table: TableIdentifier,
schema: StructType,
Expand All @@ -642,8 +691,9 @@ class ResolveSessionCatalog(
provider: String,
options: Map[String, String],
location: Option[String],
comment: Option[String],
ifNotExists: Boolean): CatalogTable = {
comment: Option[String]): CatalogTable = {
assertNoCharTypeInSchema(schema)

val storage = CatalogStorageFormat.empty.copy(
locationUri = location.map(CatalogUtils.stringToURI),
properties = options)
Expand All @@ -666,6 +716,57 @@ class ResolveSessionCatalog(
comment = comment)
}

private def buildHiveCatalogTable(
table: TableIdentifier,
schema: StructType,
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
serdeInfo: SerdeInfo,
options: Map[String, String],
location: Option[String],
comment: Option[String]): CatalogTable = {
val baseStorage = HiveSerDe.getDefaultStorage(conf).copy(
locationUri = location.map(CatalogUtils.stringToURI),
serde = serdeInfo.serde,
properties = options ++ serdeInfo.serdeProperties)

val storage = (serdeInfo.storedAs, serdeInfo.formatClasses) match {
case (Some(format), None) =>
HiveSerDe.sourceToSerDe(format) match {
case Some(hiveSerDe) =>
baseStorage.copy(
inputFormat = hiveSerDe.inputFormat,
outputFormat = hiveSerDe.outputFormat)
case _ =>
baseStorage
}
case (None, Some((inFormat, outFormat))) =>
baseStorage.copy(
inputFormat = Some(inFormat),
outputFormat = Some(outFormat))

case _ =>
baseStorage
}

val tableType = if (location.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}

CatalogTable(
identifier = table,
tableType = tableType,
storage = storage,
schema = schema,
partitionColumnNames = partitioning.asPartitionColumns,
bucketSpec = bucketSpec,
properties = properties,
comment = comment)
}

object SessionCatalogAndTable {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
case SessionCatalogAndIdentifier(catalog, ident) =>
Expand Down

0 comments on commit 12a17fc

Please sign in to comment.