Skip to content

[SPARK-19024][SQL] Implement new approach to write a permanent view #16613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,25 +223,6 @@ case class CatalogTable(
)
}

/**
* Insert/Update the view query output column names in `properties`.
*/
def withQueryColumnNames(columns: Seq[String]): CatalogTable = {
val props = new mutable.HashMap[String, String]
if (columns.nonEmpty) {
props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
columns.zipWithIndex.foreach { case (colName, index) =>
props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
}
}

// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
// while `CatalogTable` should be serializable.
copy(properties = properties.filterNot { case (key, _) =>
key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
} ++ props)
}

/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
locationUri: Option[String] = storage.locationUri,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.execution.command

import scala.util.control.NonFatal
import scala.collection.mutable

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.Alias
Expand Down Expand Up @@ -64,9 +64,9 @@ object PersistedView extends ViewType


/**
* Create or replace a view with given query plan. This command will convert the query plan to
* canonicalized SQL string, and store it as view text in metastore, if we need to create a
* permanent view.
* Create or replace a view with given query plan. This command will generate some view-specific
* properties(e.g. view default database, view query output column names) and store them as
* properties in metastore, if we need to create a permanent view.
*
* @param name the name of this view.
* @param userSpecifiedColumns the output column names and optional comments specified by users,
Expand All @@ -75,8 +75,8 @@ object PersistedView extends ViewType
* @param properties the properties of this view.
* @param originalText the original SQL text of this view, can be None if this view is created via
* Dataset API.
* @param child the logical plan that represents the view; this is used to generate a canonicalized
* version of the SQL that can be saved in the catalog.
* @param child the logical plan that represents the view; this is used to generate the logical
* plan for temporary view and the view schema.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
Expand All @@ -95,6 +95,8 @@ case class CreateViewCommand(
viewType: ViewType)
extends RunnableCommand {

import ViewHelper._

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)

if (viewType == PersistedView) {
Expand Down Expand Up @@ -137,22 +139,12 @@ case class CreateViewCommand(
// This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
verifyTemporaryObjectsNotExists(sparkSession)

val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
case (attr, (colName, None)) => Alias(attr, colName)()
case (attr, (colName, Some(colComment))) =>
val meta = new MetadataBuilder().putString("comment", colComment).build()
Alias(attr, colName)(explicitMetadata = Some(meta))
}
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}

val catalog = sparkSession.sessionState.catalog
if (viewType == LocalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
} else if (catalog.tableExists(name)) {
val tableMetadata = catalog.getTableMetadata(name)
Expand All @@ -163,7 +155,7 @@ case class CreateViewCommand(
throw new AnalysisException(s"$name is not a view")
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
catalog.alterTable(prepareTable(sparkSession, aliasedPlan))
catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
Expand All @@ -173,7 +165,7 @@ case class CreateViewCommand(
}
} else {
// Create the view if it doesn't exist.
catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false)
catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
}
Seq.empty[Row]
}
Expand Down Expand Up @@ -207,29 +199,44 @@ case class CreateViewCommand(
}

/**
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
* SQL based on the analyzed plan, and also creates the proper schema for the view.
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
* else return the analyzed plan directly.
*/
private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = {
val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL

// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
try {
sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
} catch {
case NonFatal(e) =>
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan): LogicalPlan = {
if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
case (attr, (colName, None)) => Alias(attr, colName)()
case (attr, (colName, Some(colComment))) =>
val meta = new MetadataBuilder().putString("comment", colComment).build()
Alias(attr, colName)(explicitMetadata = Some(meta))
}
session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
}

/**
* Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific
* properties(e.g. view default database, view query output column names) and store them as
* properties in the CatalogTable, and also creates the proper schema for the view.
*/
private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
if (originalText.isEmpty) {
throw new AnalysisException(
"It is not allowed to create a persisted view from the Dataset API")
}

val newProperties = generateViewProperties(properties, session, analyzedPlan)

CatalogTable(
identifier = name,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = aliasedPlan.schema,
properties = properties,
schema = aliasPlan(session, analyzedPlan).schema,
properties = newProperties,
viewOriginalText = originalText,
viewText = Some(viewSQL),
viewText = originalText,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something we can clean up: Hive will expand the view text, so it needs 2 fields: originalText and viewText. Since we don't expand the view text, but only add table properties, I think we only need a single field viewText in CatalogTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that would require changes of several tens of places, should we do that in a seprated PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, in a separated PR.

comment = comment
)
}
Expand All @@ -244,14 +251,16 @@ case class CreateViewCommand(
* @param name the name of this view.
* @param originalText the original SQL text of this view. Note that we can only alter a view by
* SQL API, which means we always have originalText.
* @param query the logical plan that represents the view; this is used to generate a canonicalized
* version of the SQL that can be saved in the catalog.
* @param query the logical plan that represents the view; this is used to generate the new view
* schema.
*/
case class AlterViewAsCommand(
name: TableIdentifier,
originalText: String,
query: LogicalPlan) extends RunnableCommand {

import ViewHelper._

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)

override def run(session: SparkSession): Seq[Row] = {
Expand All @@ -275,21 +284,80 @@ case class AlterViewAsCommand(
throw new AnalysisException(s"${viewMeta.identifier} is not a view.")
}

val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL
// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
try {
session.sql(viewSQL).queryExecution.assertAnalyzed()
} catch {
case NonFatal(e) =>
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
}
val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan)

val updatedViewMeta = viewMeta.copy(
schema = analyzedPlan.schema,
properties = newProperties,
viewOriginalText = Some(originalText),
viewText = Some(viewSQL))
viewText = Some(originalText))

session.sessionState.catalog.alterTable(updatedViewMeta)
}
}

object ViewHelper {

import CatalogTable._

/**
* Generate the view default database in `properties`.
*/
private def generateViewDefaultDatabase(databaseName: String): Map[String, String] = {
Map(VIEW_DEFAULT_DATABASE -> databaseName)
}

/**
* Generate the view query output column names in `properties`.
*/
private def generateQueryColumnNames(columns: Seq[String]): Map[String, String] = {
val props = new mutable.HashMap[String, String]
if (columns.nonEmpty) {
props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
columns.zipWithIndex.foreach { case (colName, index) =>
props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
}
}
props.toMap
}

/**
* Remove the view query output column names in `properties`.
*/
private def removeQueryColumnNames(properties: Map[String, String]): Map[String, String] = {
// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
// while `CatalogTable` should be serializable.
properties.filterNot { case (key, _) =>
key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
}
}

/**
* Generate the view properties in CatalogTable, including:
* 1. view default database that is used to provide the default database name on view resolution.
* 2. the output column names of the query that creates a view, this is used to map the output of
* the view child to the view output during view resolution.
*
* @param properties the `properties` in CatalogTable.
* @param session the spark session.
* @param analyzedPlan the analyzed logical plan that represents the child of a view.
* @return new view properties including view default database and query column names properties.
*/
def generateViewProperties(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like all other methods in this class can be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, will update that.

properties: Map[String, String],
session: SparkSession,
analyzedPlan: LogicalPlan): Map[String, String] = {
// Generate the query column names, throw an AnalysisException if there exists duplicate column
// names.
val queryOutput = analyzedPlan.schema.fieldNames
assert(queryOutput.distinct.size == queryOutput.size,
s"The view output ${queryOutput.mkString("(", ",", ")")} contains duplicate column name.")

// Generate the view default database name.
val viewDefaultDatabase = session.sessionState.catalog.getCurrentDatabase

removeQueryColumnNames(properties) ++
generateViewDefaultDatabase(viewDefaultDatabase) ++
generateQueryColumnNames(queryOutput)
}
}
10 changes: 7 additions & 3 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2501,11 +2501,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("should be able to resolve a persistent view") {
withTable("t1") {
withTable("t1", "t2") {
withView("v1") {
sql("CREATE TABLE `t1` USING parquet AS SELECT * FROM VALUES(1, 1) AS t1(a, b)")
sql("CREATE VIEW `v1` AS SELECT * FROM t1")
checkAnswer(spark.table("v1"), Row(1, 1))
sql("CREATE TABLE `t2` USING parquet AS SELECT * FROM VALUES('a', 2, 1.0) AS t2(d, e, f)")
sql("CREATE VIEW `v1`(x, y) AS SELECT * FROM t1")
checkAnswer(spark.table("v1").orderBy("x"), Row(1, 1))

sql("ALTER VIEW `v1` AS SELECT * FROM t2")
checkAnswer(spark.table("v1").orderBy("f"), Row("a", 2, 1.0))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,28 +370,30 @@ class HiveDDLSuite
spark.range(10).write.saveAsTable(tabName)
val viewName = "view1"
withView(viewName) {
val catalog = spark.sessionState.catalog
def checkProperties(expected: Map[String, String]): Boolean = {
val properties = spark.sessionState.catalog.getTableMetadata(TableIdentifier(viewName))
.properties
properties.filterNot { case (key, value) =>
Seq("transient_lastDdlTime", CatalogTable.VIEW_DEFAULT_DATABASE).contains(key) ||
key.startsWith(CatalogTable.VIEW_QUERY_OUTPUT_PREFIX)
} == expected
}
sql(s"CREATE VIEW $viewName AS SELECT * FROM $tabName")

assert(catalog.getTableMetadata(TableIdentifier(viewName))
.properties.filter(_._1 != "transient_lastDdlTime") == Map())
checkProperties(Map())
sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')")
assert(catalog.getTableMetadata(TableIdentifier(viewName))
.properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "an"))
checkProperties(Map("p" -> "an"))

// no exception or message will be issued if we set it again
sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')")
assert(catalog.getTableMetadata(TableIdentifier(viewName))
.properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "an"))
checkProperties(Map("p" -> "an"))

// the value will be updated if we set the same key to a different value
sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'b')")
assert(catalog.getTableMetadata(TableIdentifier(viewName))
.properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "b"))
checkProperties(Map("p" -> "b"))

sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
assert(catalog.getTableMetadata(TableIdentifier(viewName))
.properties.filter(_._1 != "transient_lastDdlTime") == Map())
checkProperties(Map())

val message = intercept[AnalysisException] {
sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
Expand Down Expand Up @@ -644,10 +646,7 @@ class HiveDDLSuite
Seq(
Row("# View Information", "", ""),
Row("View Original Text:", "SELECT * FROM tbl", ""),
Row("View Expanded Text:",
"SELECT `gen_attr_0` AS `a` FROM (SELECT `gen_attr_0` FROM " +
"(SELECT `a` AS `gen_attr_0` FROM `default`.`tbl`) AS gen_subquery_0) AS tbl",
"")
Row("View Expanded Text:", "SELECT * FROM tbl", "")
)
))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,15 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}

test("correctly parse CREATE VIEW statement") {
sql(
"""CREATE VIEW IF NOT EXISTS
|default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
|TBLPROPERTIES ('a' = 'b')
|AS SELECT * FROM jt""".stripMargin)
checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
sql("DROP VIEW testView")
withView("testView") {
sql(
"""CREATE VIEW IF NOT EXISTS
|default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
|TBLPROPERTIES ('a' = 'b')
|AS SELECT * FROM jt
|""".stripMargin)
checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
}
}

test("correctly parse CREATE TEMPORARY VIEW statement") {
Expand Down