Skip to content

Commit

Permalink
[SPARK-33141][SQL] Capture SQL configs when creating permanent views
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR makes CreateViewCommand/AlterViewAsCommand capturing runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. Users can set `spark.sql.legacy.useCurrentConfigsForView` to `true` to restore the behavior before.

### Why are the changes needed?
This PR is a sub-task of [SPARK-33138](https://issues.apache.org/jira/browse/SPARK-33138) that proposes to unify temp view and permanent view behaviors. This PR makes permanent views mimicking the temp view behavior that "fixes" view semantic by directly storing resolved LogicalPlan. For example, if a user uses spark 2.4 to create a view that contains null values from division-by-zero expressions, she may not want that other users' queries which reference her view throw exceptions when running on spark 3.x with ansi mode on.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
added UT + existing UTs (improved)

Closes #30289 from luluorta/SPARK-33141.

Authored-by: luluorta <luluorta@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
luluorta authored and cloud-fan committed Nov 27, 2020
1 parent b9f2f78 commit 35ded12
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 22 deletions.
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ license: |
- In Spark 3.1, the `schema_of_json` and `schema_of_csv` functions return the schema in the SQL format in which field names are quoted. In Spark 3.0, the function returns a catalog string without field quoting and in lower case.

- In Spark 3.1, refreshing a table will trigger an uncache operation for all other caches that reference the table, even if the table itself is not cached. In Spark 3.0 the operation will only be triggered if the table itself is cached.

- In Spark 3.1, creating or altering a view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`.

## Upgrading from Spark SQL 3.0 to 3.0.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,9 @@ class Analyzer(override val catalogManager: CatalogManager)
s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " +
"around this.")
}
executeSameContext(child)
SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewSQLConfigs)) {
executeSameContext(child)
}
}
view.copy(child = newChild)
case p @ SubqueryAlias(_, view: View) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,14 +795,19 @@ class SessionCatalog(

if (metadata.tableType == CatalogTableType.VIEW) {
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
logDebug(s"'$viewText' will be used for the view($table).")
val viewConfigs = metadata.viewSQLConfigs
val viewPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs)) {
parser.parsePlan(viewText)
}

logDebug(s"'$viewText' will be used for the view($table) with configs: $viewConfigs.")
// The relation is a view, so we wrap the relation by:
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
val child = View(
desc = metadata,
output = metadata.schema.toAttributes,
child = parser.parsePlan(viewText))
child = viewPlan)
SubqueryAlias(multiParts, child)
} else {
SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata, options))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,22 @@ case class CatalogTable(
}
}

/**
* Return the SQL configs of when the view was created, the configs are applied when parsing and
* analyzing the view, should be empty if the CatalogTable is not a View or created by older
* versions of Spark(before 3.1.0).
*/
def viewSQLConfigs: Map[String, String] = {
try {
for ((key, value) <- properties if key.startsWith(CatalogTable.VIEW_SQL_CONFIG_PREFIX))
yield (key.substring(CatalogTable.VIEW_SQL_CONFIG_PREFIX.length), value)
} catch {
case e: Exception =>
throw new AnalysisException(
"Corrupted view SQL configs in catalog", cause = Some(e))
}
}

/**
* Return the output column names of the query that creates a view, the column names are used to
* resolve a view, should be empty if the CatalogTable is not a View or created by older versions
Expand Down Expand Up @@ -411,6 +427,8 @@ object CatalogTable {
props.toMap
}

val VIEW_SQL_CONFIG_PREFIX = VIEW_PREFIX + "sqlConfig."

val VIEW_QUERY_OUTPUT_PREFIX = VIEW_PREFIX + "query.out."
val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols"
val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,22 @@ case class View(
}
}

object View {
def effectiveSQLConf(configs: Map[String, String]): SQLConf = {
val activeConf = SQLConf.get
if (activeConf.useCurrentSQLConfigsForView) return activeConf

val sqlConf = new SQLConf()
for ((k, v) <- configs) {
sqlConf.settings.put(k, v)
}
// We should respect the current maxNestedViewDepth cause the view resolving are executed
// from top to down.
sqlConf.setConf(SQLConf.MAX_NESTED_VIEW_DEPTH, activeConf.maxNestedViewDepth)
sqlConf
}
}

/**
* A container for holding named common table expressions (CTEs) and a query plan.
* This operator will be removed during analysis and the relations will be substituted into child.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,15 @@ object SQLConf {
"must be positive.")
.createWithDefault(100)

val USE_CURRENT_SQL_CONFIGS_FOR_VIEW =
buildConf("spark.sql.legacy.useCurrentConfigsForView")
.internal()
.doc("When true, SQL Configs of the current active SparkSession instead of the captured " +
"ones will be applied during the parsing and analysis phases of the view resolution.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

val STREAMING_FILE_COMMIT_PROTOCOL_CLASS =
buildConf("spark.sql.streaming.commitProtocolClass")
.version("2.1.0")
Expand Down Expand Up @@ -3415,6 +3424,8 @@ class SQLConf extends Serializable with Logging {

def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)

def useCurrentSQLConfigsForView: Boolean = getConf(SQLConf.USE_CURRENT_SQL_CONFIGS_FOR_VIEW)

def starSchemaDetection: Boolean = getConf(STARSCHEMA_DETECTION)

def starSchemaFTRatio: Double = getConf(STARSCHEMA_FACT_TABLE_RATIO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType}
import org.apache.spark.sql.util.SchemaUtils

Expand Down Expand Up @@ -334,6 +334,18 @@ case class ShowViewsCommand(

object ViewHelper {

private val configPrefixDenyList = Seq(
SQLConf.MAX_NESTED_VIEW_DEPTH.key,
"spark.sql.optimizer.",
"spark.sql.codegen.",
"spark.sql.execution.",
"spark.sql.shuffle.",
"spark.sql.adaptive.")

private def shouldCaptureConfig(key: String): Boolean = {
!configPrefixDenyList.exists(prefix => key.startsWith(prefix))
}

import CatalogTable._

/**
Expand Down Expand Up @@ -361,11 +373,37 @@ object ViewHelper {
}
}

/**
* Convert the view SQL configs to `properties`.
*/
private def sqlConfigsToProps(conf: SQLConf): Map[String, String] = {
val modifiedConfs = conf.getAllConfs.filter { case (k, _) =>
conf.isModifiable(k) && shouldCaptureConfig(k)
}
val props = new mutable.HashMap[String, String]
for ((key, value) <- modifiedConfs) {
props.put(s"$VIEW_SQL_CONFIG_PREFIX$key", value)
}
props.toMap
}

/**
* Remove the view SQL configs in `properties`.
*/
private def removeSQLConfigs(properties: Map[String, String]): Map[String, String] = {
// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
// while `CatalogTable` should be serializable.
properties.filterNot { case (key, _) =>
key.startsWith(VIEW_SQL_CONFIG_PREFIX)
}
}

/**
* Generate the view properties in CatalogTable, including:
* 1. view default database that is used to provide the default database name on view resolution.
* 2. the output column names of the query that creates a view, this is used to map the output of
* the view child to the view output during view resolution.
* 3. the SQL configs when creating the view.
*
* @param properties the `properties` in CatalogTable.
* @param session the spark session.
Expand All @@ -380,15 +418,18 @@ object ViewHelper {
// for createViewCommand queryOutput may be different from fieldNames
val queryOutput = analyzedPlan.schema.fieldNames

val conf = session.sessionState.conf

// Generate the query column names, throw an AnalysisException if there exists duplicate column
// names.
SchemaUtils.checkColumnNameDuplication(
fieldNames, "in the view definition", session.sessionState.conf.resolver)
fieldNames, "in the view definition", conf.resolver)

// Generate the view default catalog and namespace.
// Generate the view default catalog and namespace, as well as captured SQL configs.
val manager = session.sessionState.catalogManager
removeQueryColumnNames(properties) ++
removeSQLConfigs(removeQueryColumnNames(properties)) ++
catalogAndNamespaceToProps(manager.currentCatalog.name, manager.currentNamespace) ++
sqlConfigsToProps(conf) ++
generateQueryColumnNames(queryOutput)
}

Expand Down
Loading

0 comments on commit 35ded12

Please sign in to comment.