-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33141][SQL] Capture SQL configs when creating permanent views #30289
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1448,6 +1448,15 @@ object SQLConf { | |
"must be positive.") | ||
.createWithDefault(100) | ||
|
||
val APPLY_VIEW_SQL_CONFIGS = | ||
buildConf("spark.sql.legacy.view.applySQLConfigs") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. legacy config name should describe the legacy behavior. How about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
.internal() | ||
.doc("When true, captured SQL Configs will be applied during the parsing and analysis " + | ||
"phases of the view resolution.") | ||
.version("3.1.0") | ||
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val STREAMING_FILE_COMMIT_PROTOCOL_CLASS = | ||
buildConf("spark.sql.streaming.commitProtocolClass") | ||
.version("2.1.0") | ||
|
@@ -3385,6 +3394,8 @@ class SQLConf extends Serializable with Logging { | |
|
||
def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH) | ||
|
||
def applyViewSQLConfigs: Boolean = getConf(SQLConf.APPLY_VIEW_SQL_CONFIGS) | ||
|
||
def starSchemaDetection: Boolean = getConf(STARSCHEMA_DETECTION) | ||
|
||
def starSchemaFTRatio: Double = getConf(STARSCHEMA_FACT_TABLE_RATIO) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command | |
|
||
import scala.collection.mutable | ||
|
||
import org.json4s.jackson.JsonMethods._ | ||
|
||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||
import org.apache.spark.sql.catalyst.TableIdentifier | ||
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunction, UnresolvedRelation, ViewType} | ||
|
@@ -27,9 +29,10 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef | |
import org.apache.spark.sql.catalyst.plans.QueryPlan | ||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} | ||
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper | ||
import org.apache.spark.sql.internal.StaticSQLConf | ||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} | ||
import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType} | ||
import org.apache.spark.sql.util.SchemaUtils | ||
import org.apache.spark.util.JsonProtocol | ||
|
||
/** | ||
* Create or replace a view with given query plan. This command will generate some view-specific | ||
|
@@ -334,6 +337,21 @@ case class ShowViewsCommand( | |
|
||
object ViewHelper { | ||
|
||
private val configPrefixBlacklist = Seq( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
SQLConf.MAX_NESTED_VIEW_DEPTH.key, | ||
"spark.sql.optimizer.", | ||
"spark.sql.codegen.", | ||
"spark.sql.execution.", | ||
"spark.sql.shuffle.", | ||
"spark.sql.adaptive.") | ||
|
||
private def isConfigBlacklisted(key: String): Boolean = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
for (prefix <- configPrefixBlacklist if key.startsWith(prefix)) { | ||
return true | ||
} | ||
false | ||
} | ||
|
||
import CatalogTable._ | ||
|
||
/** | ||
|
@@ -361,11 +379,38 @@ object ViewHelper { | |
} | ||
} | ||
|
||
/** | ||
* Convert the view query SQL configs in `properties`. | ||
*/ | ||
private def generateQuerySQLConfigs(conf: SQLConf): Map[String, String] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
val modifiedConfs = conf.getAllConfs.filter { case (k, _) => | ||
conf.isModifiable(k) && !isConfigBlacklisted(k) | ||
} | ||
val props = new mutable.HashMap[String, String] | ||
if (modifiedConfs.nonEmpty) { | ||
val confJson = compact(render(JsonProtocol.mapToJson(modifiedConfs))) | ||
props.put(VIEW_QUERY_SQL_CONFIGS, confJson) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. have you stress-tested this? The hive metastore has a limitation about property value length. You can take a look at Another idea is to put one config per table property entry. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing this out. Splitting a large value string into small chunks seems a hive specific solution, so I changed to store one config per table property entry, each with a "view.sqlConfig." prefix. |
||
} | ||
props.toMap | ||
} | ||
|
||
/** | ||
* Remove the view query SQL configs in `properties`. | ||
*/ | ||
private def removeQuerySQLConfigs(properties: Map[String, String]): Map[String, String] = { | ||
// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable, | ||
// while `CatalogTable` should be serializable. | ||
properties.filterNot { case (key, _) => | ||
key == VIEW_QUERY_SQL_CONFIGS | ||
} | ||
} | ||
|
||
/** | ||
* Generate the view properties in CatalogTable, including: | ||
* 1. view default database that is used to provide the default database name on view resolution. | ||
* 2. the output column names of the query that creates a view, this is used to map the output of | ||
* the view child to the view output during view resolution. | ||
* 3. the SQL configs when creating the view. | ||
* | ||
* @param properties the `properties` in CatalogTable. | ||
* @param session the spark session. | ||
|
@@ -380,16 +425,19 @@ object ViewHelper { | |
// for createViewCommand queryOutput may be different from fieldNames | ||
val queryOutput = analyzedPlan.schema.fieldNames | ||
|
||
val conf = SQLConf.get | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
// Generate the query column names, throw an AnalysisException if there exists duplicate column | ||
// names. | ||
SchemaUtils.checkColumnNameDuplication( | ||
fieldNames, "in the view definition", session.sessionState.conf.resolver) | ||
fieldNames, "in the view definition", conf.resolver) | ||
|
||
// Generate the view default catalog and namespace. | ||
val manager = session.sessionState.catalogManager | ||
removeQueryColumnNames(properties) ++ | ||
removeQuerySQLConfigs(removeQueryColumnNames(properties)) ++ | ||
catalogAndNamespaceToProps(manager.currentCatalog.name, manager.currentNamespace) ++ | ||
generateQueryColumnNames(queryOutput) | ||
generateQueryColumnNames(queryOutput) ++ | ||
generateQuerySQLConfigs(conf) | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is similar to current catalog/namespace, which is about the context, not query. Can we define it close to
VIEW_CATALOG_AND_NAMESPACE
and follow it's property key naming?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done