Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33141][SQL] Capture SQL configs when creating permanent views #30289

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ license: |
- In Spark 3.1, the `schema_of_json` and `schema_of_csv` functions return the schema in the SQL format in which field names are quoted. In Spark 3.0, the function returns a catalog string without field quoting and in lower case.

- In Spark 3.1, refreshing a table will trigger an uncache operation for all other caches that reference the table, even if the table itself is not cached. In Spark 3.0 the operation will only be triggered if the table itself is cached.

- In Spark 3.1, creating or altering a view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.view.applySQLConfigs` to `false`.

## Upgrading from Spark SQL 3.0 to 3.0.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,9 @@ class Analyzer(override val catalogManager: CatalogManager)
s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " +
"around this.")
}
executeSameContext(child)
SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewQuerySQLConfigs)) {
executeSameContext(child)
}
}
view.copy(child = newChild)
case p @ SubqueryAlias(_, view: View) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,14 +795,19 @@ class SessionCatalog(

if (metadata.tableType == CatalogTableType.VIEW) {
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
logDebug(s"'$viewText' will be used for the view($table).")
val viewConfigs = metadata.viewQuerySQLConfigs
val viewPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs)) {
parser.parsePlan(viewText)
}

logDebug(s"'$viewText' will be used for the view($table) with configs: $viewConfigs.")
// The relation is a view, so we wrap the relation by:
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
val child = View(
desc = metadata,
output = metadata.schema.toAttributes,
child = parser.parsePlan(viewText))
child = viewPlan)
SubqueryAlias(multiParts, child)
} else {
SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata, options))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.commons.lang3.StringUtils
import org.json4s.jackson.JsonMethods

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
Expand All @@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.JsonProtocol


/**
Expand Down Expand Up @@ -321,6 +323,23 @@ case class CatalogTable(
)
}

/**
* Return the SQL configs of the query that creates a view, the configs are applied when parsing
* and analyzing the view, should be empty if the CatalogTable is not a View or created by older
* versions of Spark(before 3.1.0).
*/
def viewQuerySQLConfigs: Map[String, String] = {
try {
properties.get(CatalogTable.VIEW_QUERY_SQL_CONFIGS)
.map(confJson => JsonProtocol.mapFromJson(JsonMethods.parse(confJson)).toMap)
.getOrElse(Map.empty)
} catch {
case e: Exception =>
throw new AnalysisException(
"Corrupted view query SQL configs in catalog", cause = Some(e))
}
}

/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
locationUri: Option[URI] = storage.locationUri,
Expand Down Expand Up @@ -414,6 +433,8 @@ object CatalogTable {
val VIEW_QUERY_OUTPUT_PREFIX = VIEW_PREFIX + "query.out."
val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols"
val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col."

val VIEW_QUERY_SQL_CONFIGS = VIEW_PREFIX + "query.sqlConfigs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to current catalog/namespace, which is about the context, not query. Can we define it close to VIEW_CATALOG_AND_NAMESPACE and follow it's property key naming?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@

package org.apache.spark.sql.catalyst.plans.logical

import scala.collection.mutable

import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.random.RandomSampler
Expand Down Expand Up @@ -456,6 +453,20 @@ case class View(
}
}

object View {
def effectiveSQLConf(configs: Map[String, String]): SQLConf = {
val activeConf = SQLConf.get
if (!activeConf.applyViewSQLConfigs) return activeConf

val sqlConf = new SQLConf()
for ((k, v) <- configs) {
sqlConf.settings.put(k, v)
}
sqlConf.setConf(SQLConf.MAX_NESTED_VIEW_DEPTH, activeConf.maxNestedViewDepth)
sqlConf
}
}

/**
* A container for holding named common table expressions (CTEs) and a query plan.
* This operator will be removed during analysis and the relations will be substituted into child.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,15 @@ object SQLConf {
"must be positive.")
.createWithDefault(100)

val APPLY_VIEW_SQL_CONFIGS =
buildConf("spark.sql.legacy.view.applySQLConfigs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

legacy config name should describe the legacy behavior. How about spark.sql.legacy.useCurrentConfigsForView?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.internal()
.doc("When true, captured SQL Configs will be applied during the parsing and analysis " +
"phases of the view resolution.")
.version("3.1.0")
.booleanConf
.createWithDefault(true)

val STREAMING_FILE_COMMIT_PROTOCOL_CLASS =
buildConf("spark.sql.streaming.commitProtocolClass")
.version("2.1.0")
Expand Down Expand Up @@ -3385,6 +3394,8 @@ class SQLConf extends Serializable with Logging {

def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)

def applyViewSQLConfigs: Boolean = getConf(SQLConf.APPLY_VIEW_SQL_CONFIGS)

def starSchemaDetection: Boolean = getConf(STARSCHEMA_DETECTION)

def starSchemaFTRatio: Double = getConf(STARSCHEMA_FACT_TABLE_RATIO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command

import scala.collection.mutable

import org.json4s.jackson.JsonMethods._

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunction, UnresolvedRelation, ViewType}
Expand All @@ -27,9 +29,10 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.JsonProtocol

/**
* Create or replace a view with given query plan. This command will generate some view-specific
Expand Down Expand Up @@ -334,6 +337,21 @@ case class ShowViewsCommand(

object ViewHelper {

private val configPrefixBlacklist = Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: configPrefixDenyList

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

SQLConf.MAX_NESTED_VIEW_DEPTH.key,
"spark.sql.optimizer.",
"spark.sql.codegen.",
"spark.sql.execution.",
"spark.sql.shuffle.",
"spark.sql.adaptive.")

private def isConfigBlacklisted(key: String): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

def shouldCaptureConfig(key: String): Boolean = {
  !configPrefixDenyList.exists(prefix => key.startsWith(prefix))
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for (prefix <- configPrefixBlacklist if key.startsWith(prefix)) {
return true
}
false
}

import CatalogTable._

/**
Expand Down Expand Up @@ -361,11 +379,38 @@ object ViewHelper {
}
}

/**
* Convert the view query SQL configs in `properties`.
*/
private def generateQuerySQLConfigs(conf: SQLConf): Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sqlConfigsToProps, following catalogAndNamespaceToProps

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val modifiedConfs = conf.getAllConfs.filter { case (k, _) =>
conf.isModifiable(k) && !isConfigBlacklisted(k)
}
val props = new mutable.HashMap[String, String]
if (modifiedConfs.nonEmpty) {
val confJson = compact(render(JsonProtocol.mapToJson(modifiedConfs)))
props.put(VIEW_QUERY_SQL_CONFIGS, confJson)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you stress-tested this? The hive metastore has a limitation about property value length. You can take a look at HiveExternalCatalog.tableMetaToTableProps.

Another idea is to put one config per table property entry.

Copy link
Contributor Author

@luluorta luluorta Nov 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. Splitting a large value string into small chunks seems a hive specific solution, so I changed to store one config per table property entry, each with a "view.sqlConfig." prefix.

}
props.toMap
}

/**
* Remove the view query SQL configs in `properties`.
*/
private def removeQuerySQLConfigs(properties: Map[String, String]): Map[String, String] = {
// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
// while `CatalogTable` should be serializable.
properties.filterNot { case (key, _) =>
key == VIEW_QUERY_SQL_CONFIGS
}
}

/**
* Generate the view properties in CatalogTable, including:
* 1. view default database that is used to provide the default database name on view resolution.
* 2. the output column names of the query that creates a view, this is used to map the output of
* the view child to the view output during view resolution.
* 3. the SQL configs when creating the view.
*
* @param properties the `properties` in CatalogTable.
* @param session the spark session.
Expand All @@ -380,16 +425,19 @@ object ViewHelper {
// for createViewCommand queryOutput may be different from fieldNames
val queryOutput = analyzedPlan.schema.fieldNames

val conf = SQLConf.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we have session passed in, seems better to use session.sessionState.conf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// Generate the query column names, throw an AnalysisException if there exists duplicate column
// names.
SchemaUtils.checkColumnNameDuplication(
fieldNames, "in the view definition", session.sessionState.conf.resolver)
fieldNames, "in the view definition", conf.resolver)

// Generate the view default catalog and namespace.
val manager = session.sessionState.catalogManager
removeQueryColumnNames(properties) ++
removeQuerySQLConfigs(removeQueryColumnNames(properties)) ++
catalogAndNamespaceToProps(manager.currentCatalog.name, manager.currentNamespace) ++
generateQueryColumnNames(queryOutput)
generateQueryColumnNames(queryOutput) ++
generateQuerySQLConfigs(conf)
}

/**
Expand Down
Loading