17
17
18
18
package org .apache .spark .sql .execution .command
19
19
20
- import scala .util . control . NonFatal
20
+ import scala .collection . mutable
21
21
22
22
import org .apache .spark .sql .{AnalysisException , Row , SparkSession }
23
- import org .apache .spark .sql .catalyst .{ SQLBuilder , TableIdentifier }
23
+ import org .apache .spark .sql .catalyst .TableIdentifier
24
24
import org .apache .spark .sql .catalyst .analysis .{UnresolvedFunction , UnresolvedRelation }
25
25
import org .apache .spark .sql .catalyst .catalog .{CatalogStorageFormat , CatalogTable , CatalogTableType }
26
26
import org .apache .spark .sql .catalyst .expressions .Alias
@@ -64,9 +64,9 @@ object PersistedView extends ViewType
64
64
65
65
66
66
/**
67
- * Create or replace a view with given query plan. This command will convert the query plan to
68
- * canonicalized SQL string, and store it as view text in metastore, if we need to create a
69
- * permanent view.
67
+ * Create or replace a view with given query plan. This command will generate some view-specific
68
+ * properties(e.g. view default database, view query output column names) and store them as
69
+ * properties in metastore, if we need to create a permanent view.
70
70
*
71
71
* @param name the name of this view.
72
72
* @param userSpecifiedColumns the output column names and optional comments specified by users,
@@ -75,8 +75,8 @@ object PersistedView extends ViewType
75
75
* @param properties the properties of this view.
76
76
* @param originalText the original SQL text of this view, can be None if this view is created via
77
77
* Dataset API.
78
- * @param child the logical plan that represents the view; this is used to generate a canonicalized
79
- * version of the SQL that can be saved in the catalog .
78
+ * @param child the logical plan that represents the view; this is used to generate the logical
79
+ * plan for temporary view and the view schema .
80
80
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
81
81
* already exists, throws analysis exception.
82
82
* @param replace if true, and if the view already exists, updates it; if false, and if the view
@@ -95,6 +95,8 @@ case class CreateViewCommand(
95
95
viewType : ViewType )
96
96
extends RunnableCommand {
97
97
98
+ import ViewHelper ._
99
+
98
100
override protected def innerChildren : Seq [QueryPlan [_]] = Seq (child)
99
101
100
102
if (viewType == PersistedView ) {
@@ -137,22 +139,12 @@ case class CreateViewCommand(
137
139
// This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
138
140
verifyTemporaryObjectsNotExists(sparkSession)
139
141
140
- val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
141
- analyzedPlan
142
- } else {
143
- val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
144
- case (attr, (colName, None )) => Alias (attr, colName)()
145
- case (attr, (colName, Some (colComment))) =>
146
- val meta = new MetadataBuilder ().putString(" comment" , colComment).build()
147
- Alias (attr, colName)(explicitMetadata = Some (meta))
148
- }
149
- sparkSession.sessionState.executePlan(Project (projectList, analyzedPlan)).analyzed
150
- }
151
-
152
142
val catalog = sparkSession.sessionState.catalog
153
143
if (viewType == LocalTempView ) {
144
+ val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
154
145
catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
155
146
} else if (viewType == GlobalTempView ) {
147
+ val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
156
148
catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
157
149
} else if (catalog.tableExists(name)) {
158
150
val tableMetadata = catalog.getTableMetadata(name)
@@ -163,7 +155,7 @@ case class CreateViewCommand(
163
155
throw new AnalysisException (s " $name is not a view " )
164
156
} else if (replace) {
165
157
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
166
- catalog.alterTable(prepareTable(sparkSession, aliasedPlan ))
158
+ catalog.alterTable(prepareTable(sparkSession, analyzedPlan ))
167
159
} else {
168
160
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
169
161
// exists.
@@ -173,7 +165,7 @@ case class CreateViewCommand(
173
165
}
174
166
} else {
175
167
// Create the view if it doesn't exist.
176
- catalog.createTable(prepareTable(sparkSession, aliasedPlan ), ignoreIfExists = false )
168
+ catalog.createTable(prepareTable(sparkSession, analyzedPlan ), ignoreIfExists = false )
177
169
}
178
170
Seq .empty[Row ]
179
171
}
@@ -207,29 +199,44 @@ case class CreateViewCommand(
207
199
}
208
200
209
201
/**
210
- * Returns a [[ CatalogTable ]] that can be used to save in the catalog. This comment canonicalize
211
- * SQL based on the analyzed plan, and also creates the proper schema for the view .
202
+ * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
203
+ * else return the analyzed plan directly .
212
204
*/
213
- private def prepareTable (sparkSession : SparkSession , aliasedPlan : LogicalPlan ): CatalogTable = {
214
- val viewSQL : String = new SQLBuilder (aliasedPlan).toSQL
215
-
216
- // Validate the view SQL - make sure we can parse it and analyze it.
217
- // If we cannot analyze the generated query, there is probably a bug in SQL generation.
218
- try {
219
- sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
220
- } catch {
221
- case NonFatal (e) =>
222
- throw new RuntimeException (s " Failed to analyze the canonicalized SQL: $viewSQL" , e)
205
+ private def aliasPlan (session : SparkSession , analyzedPlan : LogicalPlan ): LogicalPlan = {
206
+ if (userSpecifiedColumns.isEmpty) {
207
+ analyzedPlan
208
+ } else {
209
+ val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
210
+ case (attr, (colName, None )) => Alias (attr, colName)()
211
+ case (attr, (colName, Some (colComment))) =>
212
+ val meta = new MetadataBuilder ().putString(" comment" , colComment).build()
213
+ Alias (attr, colName)(explicitMetadata = Some (meta))
214
+ }
215
+ session.sessionState.executePlan(Project (projectList, analyzedPlan)).analyzed
223
216
}
217
+ }
218
+
219
+ /**
220
+ * Returns a [[CatalogTable ]] that can be used to save in the catalog. Generate the view-specific
221
+ * properties(e.g. view default database, view query output column names) and store them as
222
+ * properties in the CatalogTable, and also creates the proper schema for the view.
223
+ */
224
+ private def prepareTable (session : SparkSession , analyzedPlan : LogicalPlan ): CatalogTable = {
225
+ if (originalText.isEmpty) {
226
+ throw new AnalysisException (
227
+ " It is not allowed to create a persisted view from the Dataset API" )
228
+ }
229
+
230
+ val newProperties = generateViewProperties(properties, session, analyzedPlan)
224
231
225
232
CatalogTable (
226
233
identifier = name,
227
234
tableType = CatalogTableType .VIEW ,
228
235
storage = CatalogStorageFormat .empty,
229
- schema = aliasedPlan .schema,
230
- properties = properties ,
236
+ schema = aliasPlan(session, analyzedPlan) .schema,
237
+ properties = newProperties ,
231
238
viewOriginalText = originalText,
232
- viewText = Some (viewSQL) ,
239
+ viewText = originalText ,
233
240
comment = comment
234
241
)
235
242
}
@@ -244,14 +251,16 @@ case class CreateViewCommand(
244
251
* @param name the name of this view.
245
252
* @param originalText the original SQL text of this view. Note that we can only alter a view by
246
253
* SQL API, which means we always have originalText.
247
- * @param query the logical plan that represents the view; this is used to generate a canonicalized
248
- * version of the SQL that can be saved in the catalog .
254
+ * @param query the logical plan that represents the view; this is used to generate the new view
255
+ * schema .
249
256
*/
250
257
case class AlterViewAsCommand (
251
258
name : TableIdentifier ,
252
259
originalText : String ,
253
260
query : LogicalPlan ) extends RunnableCommand {
254
261
262
+ import ViewHelper ._
263
+
255
264
override protected def innerChildren : Seq [QueryPlan [_]] = Seq (query)
256
265
257
266
override def run (session : SparkSession ): Seq [Row ] = {
@@ -275,21 +284,80 @@ case class AlterViewAsCommand(
275
284
throw new AnalysisException (s " ${viewMeta.identifier} is not a view. " )
276
285
}
277
286
278
- val viewSQL : String = new SQLBuilder (analyzedPlan).toSQL
279
- // Validate the view SQL - make sure we can parse it and analyze it.
280
- // If we cannot analyze the generated query, there is probably a bug in SQL generation.
281
- try {
282
- session.sql(viewSQL).queryExecution.assertAnalyzed()
283
- } catch {
284
- case NonFatal (e) =>
285
- throw new RuntimeException (s " Failed to analyze the canonicalized SQL: $viewSQL" , e)
286
- }
287
+ val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan)
287
288
288
289
val updatedViewMeta = viewMeta.copy(
289
290
schema = analyzedPlan.schema,
291
+ properties = newProperties,
290
292
viewOriginalText = Some (originalText),
291
- viewText = Some (viewSQL ))
293
+ viewText = Some (originalText ))
292
294
293
295
session.sessionState.catalog.alterTable(updatedViewMeta)
294
296
}
295
297
}
298
+
299
+ object ViewHelper {
300
+
301
+ import CatalogTable ._
302
+
303
+ /**
304
+ * Generate the view default database in `properties`.
305
+ */
306
+ private def generateViewDefaultDatabase (databaseName : String ): Map [String , String ] = {
307
+ Map (VIEW_DEFAULT_DATABASE -> databaseName)
308
+ }
309
+
310
+ /**
311
+ * Generate the view query output column names in `properties`.
312
+ */
313
+ private def generateQueryColumnNames (columns : Seq [String ]): Map [String , String ] = {
314
+ val props = new mutable.HashMap [String , String ]
315
+ if (columns.nonEmpty) {
316
+ props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS , columns.length.toString)
317
+ columns.zipWithIndex.foreach { case (colName, index) =>
318
+ props.put(s " $VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index" , colName)
319
+ }
320
+ }
321
+ props.toMap
322
+ }
323
+
324
+ /**
325
+ * Remove the view query output column names in `properties`.
326
+ */
327
+ private def removeQueryColumnNames (properties : Map [String , String ]): Map [String , String ] = {
328
+ // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
329
+ // while `CatalogTable` should be serializable.
330
+ properties.filterNot { case (key, _) =>
331
+ key.startsWith(VIEW_QUERY_OUTPUT_PREFIX )
332
+ }
333
+ }
334
+
335
+ /**
336
+ * Generate the view properties in CatalogTable, including:
337
+ * 1. view default database that is used to provide the default database name on view resolution.
338
+ * 2. the output column names of the query that creates a view, this is used to map the output of
339
+ * the view child to the view output during view resolution.
340
+ *
341
+ * @param properties the `properties` in CatalogTable.
342
+ * @param session the spark session.
343
+ * @param analyzedPlan the analyzed logical plan that represents the child of a view.
344
+ * @return new view properties including view default database and query column names properties.
345
+ */
346
+ def generateViewProperties (
347
+ properties : Map [String , String ],
348
+ session : SparkSession ,
349
+ analyzedPlan : LogicalPlan ): Map [String , String ] = {
350
+ // Generate the query column names, throw an AnalysisException if there exists duplicate column
351
+ // names.
352
+ val queryOutput = analyzedPlan.schema.fieldNames
353
+ assert(queryOutput.distinct.size == queryOutput.size,
354
+ s " The view output ${queryOutput.mkString(" (" , " ," , " )" )} contains duplicate column name. " )
355
+
356
+ // Generate the view default database name.
357
+ val viewDefaultDatabase = session.sessionState.catalog.getCurrentDatabase
358
+
359
+ removeQueryColumnNames(properties) ++
360
+ generateViewDefaultDatabase(viewDefaultDatabase) ++
361
+ generateQueryColumnNames(queryOutput)
362
+ }
363
+ }
0 commit comments