-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-19024][SQL] Implement new approach to write a permanent view #16613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
917ca04
implement view write path for the new approach.
jiangxb1987 9d582a4
update failed test cases in HiveDDLSuite.
jiangxb1987 2d49ef2
code refactor.
jiangxb1987 e2ccdd5
refactor aliasedPlan.
jiangxb1987 7c5b6af
code refactor.
jiangxb1987 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,10 +17,10 @@ | |
|
||
package org.apache.spark.sql.execution.command | ||
|
||
import scala.util.control.NonFatal | ||
import scala.collection.mutable | ||
|
||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} | ||
import org.apache.spark.sql.catalyst.TableIdentifier | ||
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation} | ||
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} | ||
import org.apache.spark.sql.catalyst.expressions.Alias | ||
|
@@ -64,9 +64,9 @@ object PersistedView extends ViewType | |
|
||
|
||
/** | ||
* Create or replace a view with given query plan. This command will convert the query plan to | ||
* canonicalized SQL string, and store it as view text in metastore, if we need to create a | ||
* permanent view. | ||
* Create or replace a view with given query plan. This command will generate some view-specific | ||
* properties(e.g. view default database, view query output column names) and store them as | ||
* properties in metastore, if we need to create a permanent view. | ||
* | ||
* @param name the name of this view. | ||
* @param userSpecifiedColumns the output column names and optional comments specified by users, | ||
|
@@ -75,8 +75,8 @@ object PersistedView extends ViewType | |
* @param properties the properties of this view. | ||
* @param originalText the original SQL text of this view, can be None if this view is created via | ||
* Dataset API. | ||
* @param child the logical plan that represents the view; this is used to generate a canonicalized | ||
* version of the SQL that can be saved in the catalog. | ||
* @param child the logical plan that represents the view; this is used to generate the logical | ||
* plan for temporary view and the view schema. | ||
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view | ||
* already exists, throws analysis exception. | ||
* @param replace if true, and if the view already exists, updates it; if false, and if the view | ||
|
@@ -95,6 +95,8 @@ case class CreateViewCommand( | |
viewType: ViewType) | ||
extends RunnableCommand { | ||
|
||
import ViewHelper._ | ||
|
||
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) | ||
|
||
if (viewType == PersistedView) { | ||
|
@@ -137,22 +139,12 @@ case class CreateViewCommand( | |
// This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved) | ||
verifyTemporaryObjectsNotExists(sparkSession) | ||
|
||
val aliasedPlan = if (userSpecifiedColumns.isEmpty) { | ||
analyzedPlan | ||
} else { | ||
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { | ||
case (attr, (colName, None)) => Alias(attr, colName)() | ||
case (attr, (colName, Some(colComment))) => | ||
val meta = new MetadataBuilder().putString("comment", colComment).build() | ||
Alias(attr, colName)(explicitMetadata = Some(meta)) | ||
} | ||
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed | ||
} | ||
|
||
val catalog = sparkSession.sessionState.catalog | ||
if (viewType == LocalTempView) { | ||
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) | ||
catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace) | ||
} else if (viewType == GlobalTempView) { | ||
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) | ||
catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace) | ||
} else if (catalog.tableExists(name)) { | ||
val tableMetadata = catalog.getTableMetadata(name) | ||
|
@@ -163,7 +155,7 @@ case class CreateViewCommand( | |
throw new AnalysisException(s"$name is not a view") | ||
} else if (replace) { | ||
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` | ||
catalog.alterTable(prepareTable(sparkSession, aliasedPlan)) | ||
catalog.alterTable(prepareTable(sparkSession, analyzedPlan)) | ||
} else { | ||
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already | ||
// exists. | ||
|
@@ -173,7 +165,7 @@ case class CreateViewCommand( | |
} | ||
} else { | ||
// Create the view if it doesn't exist. | ||
catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false) | ||
catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) | ||
} | ||
Seq.empty[Row] | ||
} | ||
|
@@ -207,29 +199,44 @@ case class CreateViewCommand( | |
} | ||
|
||
/** | ||
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize | ||
* SQL based on the analyzed plan, and also creates the proper schema for the view. | ||
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, | ||
* else return the analyzed plan directly. | ||
*/ | ||
private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = { | ||
val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL | ||
|
||
// Validate the view SQL - make sure we can parse it and analyze it. | ||
// If we cannot analyze the generated query, there is probably a bug in SQL generation. | ||
try { | ||
sparkSession.sql(viewSQL).queryExecution.assertAnalyzed() | ||
} catch { | ||
case NonFatal(e) => | ||
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e) | ||
private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan): LogicalPlan = { | ||
if (userSpecifiedColumns.isEmpty) { | ||
analyzedPlan | ||
} else { | ||
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { | ||
case (attr, (colName, None)) => Alias(attr, colName)() | ||
case (attr, (colName, Some(colComment))) => | ||
val meta = new MetadataBuilder().putString("comment", colComment).build() | ||
Alias(attr, colName)(explicitMetadata = Some(meta)) | ||
} | ||
session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed | ||
} | ||
} | ||
|
||
/** | ||
* Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific | ||
* properties(e.g. view default database, view query output column names) and store them as | ||
* properties in the CatalogTable, and also creates the proper schema for the view. | ||
*/ | ||
private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = { | ||
if (originalText.isEmpty) { | ||
throw new AnalysisException( | ||
"It is not allowed to create a persisted view from the Dataset API") | ||
} | ||
|
||
val newProperties = generateViewProperties(properties, session, analyzedPlan) | ||
|
||
CatalogTable( | ||
identifier = name, | ||
tableType = CatalogTableType.VIEW, | ||
storage = CatalogStorageFormat.empty, | ||
schema = aliasedPlan.schema, | ||
properties = properties, | ||
schema = aliasPlan(session, analyzedPlan).schema, | ||
properties = newProperties, | ||
viewOriginalText = originalText, | ||
viewText = Some(viewSQL), | ||
viewText = originalText, | ||
comment = comment | ||
) | ||
} | ||
|
@@ -244,14 +251,16 @@ case class CreateViewCommand( | |
* @param name the name of this view. | ||
* @param originalText the original SQL text of this view. Note that we can only alter a view by | ||
* SQL API, which means we always have originalText. | ||
* @param query the logical plan that represents the view; this is used to generate a canonicalized | ||
* version of the SQL that can be saved in the catalog. | ||
* @param query the logical plan that represents the view; this is used to generate the new view | ||
* schema. | ||
*/ | ||
case class AlterViewAsCommand( | ||
name: TableIdentifier, | ||
originalText: String, | ||
query: LogicalPlan) extends RunnableCommand { | ||
|
||
import ViewHelper._ | ||
|
||
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) | ||
|
||
override def run(session: SparkSession): Seq[Row] = { | ||
|
@@ -275,21 +284,80 @@ case class AlterViewAsCommand( | |
throw new AnalysisException(s"${viewMeta.identifier} is not a view.") | ||
} | ||
|
||
val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL | ||
// Validate the view SQL - make sure we can parse it and analyze it. | ||
// If we cannot analyze the generated query, there is probably a bug in SQL generation. | ||
try { | ||
session.sql(viewSQL).queryExecution.assertAnalyzed() | ||
} catch { | ||
case NonFatal(e) => | ||
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e) | ||
} | ||
val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan) | ||
|
||
val updatedViewMeta = viewMeta.copy( | ||
schema = analyzedPlan.schema, | ||
properties = newProperties, | ||
viewOriginalText = Some(originalText), | ||
viewText = Some(viewSQL)) | ||
viewText = Some(originalText)) | ||
|
||
session.sessionState.catalog.alterTable(updatedViewMeta) | ||
} | ||
} | ||
|
||
object ViewHelper { | ||
|
||
import CatalogTable._ | ||
|
||
/** | ||
* Generate the view default database in `properties`. | ||
*/ | ||
private def generateViewDefaultDatabase(databaseName: String): Map[String, String] = { | ||
Map(VIEW_DEFAULT_DATABASE -> databaseName) | ||
} | ||
|
||
/** | ||
* Generate the view query output column names in `properties`. | ||
*/ | ||
private def generateQueryColumnNames(columns: Seq[String]): Map[String, String] = { | ||
val props = new mutable.HashMap[String, String] | ||
if (columns.nonEmpty) { | ||
props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString) | ||
columns.zipWithIndex.foreach { case (colName, index) => | ||
props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName) | ||
} | ||
} | ||
props.toMap | ||
} | ||
|
||
/** | ||
* Remove the view query output column names in `properties`. | ||
*/ | ||
private def removeQueryColumnNames(properties: Map[String, String]): Map[String, String] = { | ||
// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable, | ||
// while `CatalogTable` should be serializable. | ||
properties.filterNot { case (key, _) => | ||
key.startsWith(VIEW_QUERY_OUTPUT_PREFIX) | ||
} | ||
} | ||
|
||
/** | ||
* Generate the view properties in CatalogTable, including: | ||
* 1. view default database that is used to provide the default database name on view resolution. | ||
* 2. the output column names of the query that creates a view, this is used to map the output of | ||
* the view child to the view output during view resolution. | ||
* | ||
* @param properties the `properties` in CatalogTable. | ||
* @param session the spark session. | ||
* @param analyzedPlan the analyzed logical plan that represents the child of a view. | ||
* @return new view properties including view default database and query column names properties. | ||
*/ | ||
def generateViewProperties( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like all other methods in this class can be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, will update that. |
||
properties: Map[String, String], | ||
session: SparkSession, | ||
analyzedPlan: LogicalPlan): Map[String, String] = { | ||
// Generate the query column names, throw an AnalysisException if there exists duplicate column | ||
// names. | ||
val queryOutput = analyzedPlan.schema.fieldNames | ||
assert(queryOutput.distinct.size == queryOutput.size, | ||
s"The view output ${queryOutput.mkString("(", ",", ")")} contains duplicate column name.") | ||
|
||
// Generate the view default database name. | ||
val viewDefaultDatabase = session.sessionState.catalog.getCurrentDatabase | ||
|
||
removeQueryColumnNames(properties) ++ | ||
generateViewDefaultDatabase(viewDefaultDatabase) ++ | ||
generateQueryColumnNames(queryOutput) | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something we can clean up: Hive will expand the view text, so it needs 2 fields: originalText and viewText. Since we don't expand the view text, but only add table properties, I think we only need a single field
viewText
in CatalogTable.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid that would require changes of several tens of places, should we do that in a seprated PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, in a separated PR.