Skip to content

[SPARK-30535][SQL] Migrate ALTER TABLE commands to the new framework #27243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -755,12 +755,14 @@ class Analyzer(
.map(view => i.copy(table = view))
.getOrElse(i)
case u @ UnresolvedTable(ident) =>
lookupTempView(ident).foreach { _ =>
u.failAnalysis(s"${ident.quoted} is a temp view not table.")
}
u
lookupTempView(ident)
.map(_ => UnresolvedTableWithViewExists(
ResolvedView(ident.asIdentifier, isTempView = true)))
.getOrElse(u)
case u @ UnresolvedTableOrView(ident) =>
lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u)
lookupTempView(ident)
.map(_ => ResolvedView(ident.asIdentifier, isTempView = true))
.getOrElse(u)
}

def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
Expand Down Expand Up @@ -814,14 +816,6 @@ class Analyzer(
lookupV2Relation(u.multipartIdentifier)
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)

case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
CatalogV2Util.loadRelation(u.catalog, u.tableName)
.map(rel => alter.copy(table = rel))
.getOrElse(alter)

case u: UnresolvedV2Relation =>
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
}

/**
Expand Down Expand Up @@ -888,8 +882,7 @@ class Analyzer(

case u @ UnresolvedTable(identifier) =>
lookupTableOrView(identifier).map {
case v: ResolvedView =>
u.failAnalysis(s"${v.identifier.quoted} is a view not table.")
case v: ResolvedView => UnresolvedTableWithViewExists(v)
case table => table
}.getOrElse(u)

Expand All @@ -902,7 +895,7 @@ class Analyzer(
case SessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table if v1Table.v1Table.tableType == CatalogTableType.VIEW =>
ResolvedView(ident)
ResolvedView(ident, isTempView = false)
case table =>
ResolvedTable(catalog.asTableCatalog, ident, table)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -86,6 +87,20 @@ trait CheckAnalysis extends PredicateHelper {
}

def checkAnalysis(plan: LogicalPlan): Unit = {
// Analysis that needs to be performed top down can be added here.
plan.foreach {
case p if p.analyzed => // Skip already analyzed sub-plans

case alter: AlterTable =>
alter.table match {
case u @ UnresolvedTableWithViewExists(view) if !view.isTempView =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the error message for temp view?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current/existing tests expect [t] is a temp view not a table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use the same error message as permanent views?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I will do this as a follow-up. Thanks @cloud-fan for the review!

u.failAnalysis("Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
case _ =>
}

case _ => // Analysis successful!
}

// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
Expand All @@ -104,23 +119,13 @@ trait CheckAnalysis extends PredicateHelper {
case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

case u: UnresolvedTableWithViewExists =>
val viewKind = if (u.view.isTempView) { "temp view" } else { "view" }
u.failAnalysis(s"${u.view.identifier.quoted} is a $viewKind not a table.")

case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) =>
failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")

case u: UnresolvedV2Relation if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")

case u: UnresolvedV2Relation =>
u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")

case AlterTable(_, _, u: UnresolvedV2Relation, _) =>
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the type check fails,
Expand Down Expand Up @@ -419,8 +424,9 @@ trait CheckAnalysis extends PredicateHelper {
case _ =>
}

case alter: AlterTable if alter.childrenResolved =>
val table = alter.table
case alter: AlterTable
if alter.childrenResolved && alter.table.isInstanceOf[ResolvedTable] =>
val table = alter.table.asInstanceOf[ResolvedTable].table
def findField(operation: String, fieldName: Array[String]): StructField = {
// include collections because structs nested in maps and arrays may be altered
val field = table.schema.findNestedField(fieldName, includeCollections = true)
Expand Down Expand Up @@ -469,6 +475,8 @@ trait CheckAnalysis extends PredicateHelper {
throw new AnalysisException(
s"Cannot change nullable column to non-nullable: $fieldName")
}
case update: UpdateColumnPosition =>
findField("update", update.fieldNames)
case rename: RenameColumn =>
findField("rename", rename.fieldNames)
case update: UpdateColumnComment =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog}

/**
* Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
Expand All @@ -32,71 +32,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
import org.apache.spark.sql.connector.catalog.CatalogV2Util._

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
val changes = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
}
createAlterTable(nameParts, catalog, tbl, changes)

case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = a.nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = a.comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = a.position.map { newPosition =>
TableChange.updateColumnPosition(colName, newPosition)
}
createAlterTable(
nameParts,
catalog,
tbl,
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)

case AlterTableRenameColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) =>
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
createAlterTable(nameParts, catalog, tbl, changes)

case AlterTableDropColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
createAlterTable(nameParts, catalog, tbl, changes)

case AlterTableSetPropertiesStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), props) =>
val changes = props.map { case (key, value) =>
TableChange.setProperty(key, value)
}.toSeq
createAlterTable(nameParts, catalog, tbl, changes)

// TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag.
case AlterTableUnsetPropertiesStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), keys, _) =>
val changes = keys.map(key => TableChange.removeProperty(key))
createAlterTable(nameParts, catalog, tbl, changes)

case AlterTableSetLocationStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), partitionSpec, newLoc) =>
if (partitionSpec.nonEmpty) {
throw new AnalysisException(
"ALTER TABLE SET LOCATION does not support partition for v2 tables.")
}
val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc))
createAlterTable(nameParts, catalog, tbl, changes)

case AlterViewSetPropertiesStatement(
NonSessionCatalogAndTable(catalog, tbl), props) =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}

/**
Expand Down Expand Up @@ -60,28 +59,6 @@ object UnresolvedRelation {
UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table)
}

/**
* A variant of [[UnresolvedRelation]] which can only be resolved to a v2 relation
* (`DataSourceV2Relation`), not v1 relation or temp view.
*
* @param originalNameParts the original table identifier name parts before catalog is resolved.
* @param catalog The catalog which the table should be looked up from.
* @param tableName The name of the table to look up.
*/
case class UnresolvedV2Relation(
originalNameParts: Seq[String],
catalog: TableCatalog,
tableName: Identifier)
extends LeafNode with NamedRelation {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override def name: String = originalNameParts.quoted

override def output: Seq[Attribute] = Nil

override lazy val resolved = false
}

/**
* An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into
* a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, Table, TableCatalog}

/**
Expand All @@ -41,6 +41,16 @@ case class UnresolvedTable(multipartIdentifier: Seq[String]) extends LeafNode {
override def output: Seq[Attribute] = Nil
}

/**
* Holds the resolved view. It is used in a scenario where table is expected but the identifier
* is resolved to a (temp) view.
*/
case class UnresolvedTableWithViewExists(view: ResolvedView) extends LeafNode {
override lazy val resolved: Boolean = false

override def output: Seq[Attribute] = Nil
}

/**
* Holds the name of a table or view that has yet to be looked up in a catalog. It will
* be resolved to [[ResolvedTable]] or [[ResolvedView]] during analysis.
Expand Down Expand Up @@ -71,6 +81,6 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T
*/
// TODO: create a generic representation for temp view, v1 view and v2 view, after we add view
// support to v2 catalog. For now we only need the identifier to fallback to v1 command.
case class ResolvedView(identifier: Identifier) extends LeafNode {
case class ResolvedView(identifier: Identifier, isTempView: Boolean) extends LeafNode {
override def output: Seq[Attribute] = Nil
}
Loading