Skip to content

Commit 1e3118c

Browse files
committed
[SPARK-22977][SQL] fix web UI SQL tab for CTAS
## What changes were proposed in this pull request? This is a regression in Spark 2.3. In Spark 2.2, we have a fragile UI support for SQL data writing commands. We only track the input query plan of `FileFormatWriter` and display its metrics. This is not ideal because we don't know who triggered the writing(can be table insertion, CTAS, etc.), but it's still useful to see the metrics of the input query. In Spark 2.3, we introduced a new mechanism: `DataWritigCommand`, to fix the UI issue entirely. Now these writing commands have real children, and we don't need to hack into the `FileFormatWriter` for the UI. This also helps with `explain`, now `explain` can show the physical plan of the input query, while in 2.2 the physical writing plan is simply `ExecutedCommandExec` and it has no child. However there is a regression in CTAS. CTAS commands don't extend `DataWritigCommand`, and we don't have the UI hack in `FileFormatWriter` anymore, so the UI for CTAS is just an empty node. See https://issues.apache.org/jira/browse/SPARK-22977 for more information about this UI issue. To fix it, we should apply the `DataWritigCommand` mechanism to CTAS commands. TODO: In the future, we should refactor this part and create some physical layer code pieces for data writing, and reuse them in different writing commands. We should have different logical nodes for different operators, even some of them share some same logic, e.g. CTAS, CREATE TABLE, INSERT TABLE. Internally we can share the same physical logic. ## How was this patch tested? manually tested. For data source table <img width="644" alt="1" src="https://user-images.githubusercontent.com/3182036/35874155-bdffab28-0ba6-11e8-94a8-e32e106ba069.png"> For hive table <img width="666" alt="2" src="https://user-images.githubusercontent.com/3182036/35874161-c437e2a8-0ba6-11e8-98ed-7930f01432c5.png"> Author: Wenchen Fan <wenchen@databricks.com> Closes #20521 from cloud-fan/UI. (cherry picked from commit 0e2c266) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 79e8650 commit 1e3118c

File tree

6 files changed

+80
-70
lines changed

6 files changed

+80
-70
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import java.net.URI
2121

2222
import org.apache.spark.sql._
2323
import org.apache.spark.sql.catalyst.catalog._
24+
import org.apache.spark.sql.catalyst.expressions.Attribute
2425
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
26+
import org.apache.spark.sql.execution.SparkPlan
2527
import org.apache.spark.sql.execution.datasources._
2628
import org.apache.spark.sql.sources.BaseRelation
2729
import org.apache.spark.sql.types.StructType
@@ -136,12 +138,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
136138
case class CreateDataSourceTableAsSelectCommand(
137139
table: CatalogTable,
138140
mode: SaveMode,
139-
query: LogicalPlan)
140-
extends RunnableCommand {
141-
142-
override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
141+
query: LogicalPlan,
142+
outputColumns: Seq[Attribute])
143+
extends DataWritingCommand {
143144

144-
override def run(sparkSession: SparkSession): Seq[Row] = {
145+
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
145146
assert(table.tableType != CatalogTableType.VIEW)
146147
assert(table.provider.isDefined)
147148

@@ -163,7 +164,7 @@ case class CreateDataSourceTableAsSelectCommand(
163164
}
164165

165166
saveDataIntoTable(
166-
sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
167+
sparkSession, table, table.storage.locationUri, child, SaveMode.Append, tableExists = true)
167168
} else {
168169
assert(table.schema.isEmpty)
169170

@@ -173,7 +174,7 @@ case class CreateDataSourceTableAsSelectCommand(
173174
table.storage.locationUri
174175
}
175176
val result = saveDataIntoTable(
176-
sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
177+
sparkSession, table, tableLocation, child, SaveMode.Overwrite, tableExists = false)
177178
val newTable = table.copy(
178179
storage = table.storage.copy(locationUri = tableLocation),
179180
// We will use the schema of resolved.relation as the schema of the table (instead of
@@ -198,10 +199,10 @@ case class CreateDataSourceTableAsSelectCommand(
198199
session: SparkSession,
199200
table: CatalogTable,
200201
tableLocation: Option[URI],
201-
data: LogicalPlan,
202+
physicalPlan: SparkPlan,
202203
mode: SaveMode,
203204
tableExists: Boolean): BaseRelation = {
204-
// Create the relation based on the input logical plan: `data`.
205+
// Create the relation based on the input logical plan: `query`.
205206
val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
206207
val dataSource = DataSource(
207208
session,
@@ -212,7 +213,7 @@ case class CreateDataSourceTableAsSelectCommand(
212213
catalogTable = if (tableExists) Some(table) else None)
213214

214215
try {
215-
dataSource.writeAndRead(mode, query)
216+
dataSource.writeAndRead(mode, query, outputColumns, physicalPlan)
216217
} catch {
217218
case ex: AnalysisException =>
218219
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ import org.apache.spark.internal.Logging
3131
import org.apache.spark.sql._
3232
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
3333
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
34+
import org.apache.spark.sql.catalyst.expressions.Attribute
3435
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3536
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
37+
import org.apache.spark.sql.execution.SparkPlan
3638
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
3739
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
3840
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -435,10 +437,11 @@ case class DataSource(
435437
}
436438

437439
/**
438-
* Writes the given [[LogicalPlan]] out in this [[FileFormat]].
440+
* Creates a command node to write the given [[LogicalPlan]] out to the given [[FileFormat]].
441+
* The returned command is unresolved and need to be analyzed.
439442
*/
440443
private def planForWritingFileFormat(
441-
format: FileFormat, mode: SaveMode, data: LogicalPlan): LogicalPlan = {
444+
format: FileFormat, mode: SaveMode, data: LogicalPlan): InsertIntoHadoopFsRelationCommand = {
442445
// Don't glob path for the write path. The contracts here are:
443446
// 1. Only one output path can be specified on the write path;
444447
// 2. Output path must be a legal HDFS style file system path;
@@ -482,9 +485,24 @@ case class DataSource(
482485
/**
483486
* Writes the given [[LogicalPlan]] out to this [[DataSource]] and returns a [[BaseRelation]] for
484487
* the following reading.
488+
*
489+
* @param mode The save mode for this writing.
490+
* @param data The input query plan that produces the data to be written. Note that this plan
491+
* is analyzed and optimized.
492+
* @param outputColumns The original output columns of the input query plan. The optimizer may not
493+
* preserve the output column's names' case, so we need this parameter
494+
* instead of `data.output`.
495+
* @param physicalPlan The physical plan of the input query plan. We should run the writing
496+
* command with this physical plan instead of creating a new physical plan,
497+
* so that the metrics can be correctly linked to the given physical plan and
498+
* shown in the web UI.
485499
*/
486-
def writeAndRead(mode: SaveMode, data: LogicalPlan): BaseRelation = {
487-
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
500+
def writeAndRead(
501+
mode: SaveMode,
502+
data: LogicalPlan,
503+
outputColumns: Seq[Attribute],
504+
physicalPlan: SparkPlan): BaseRelation = {
505+
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
488506
throw new AnalysisException("Cannot save interval data type into external storage.")
489507
}
490508

@@ -493,9 +511,23 @@ case class DataSource(
493511
dataSource.createRelation(
494512
sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data))
495513
case format: FileFormat =>
496-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd
514+
val cmd = planForWritingFileFormat(format, mode, data)
515+
val resolvedPartCols = cmd.partitionColumns.map { col =>
516+
// The partition columns created in `planForWritingFileFormat` should always be
517+
// `UnresolvedAttribute` with a single name part.
518+
assert(col.isInstanceOf[UnresolvedAttribute])
519+
val unresolved = col.asInstanceOf[UnresolvedAttribute]
520+
assert(unresolved.nameParts.length == 1)
521+
val name = unresolved.nameParts.head
522+
outputColumns.find(a => equality(a.name, name)).getOrElse {
523+
throw new AnalysisException(
524+
s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]")
525+
}
526+
}
527+
val resolved = cmd.copy(partitionColumns = resolvedPartCols, outputColumns = outputColumns)
528+
resolved.run(sparkSession, physicalPlan)
497529
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
498-
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
530+
copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation()
499531
case _ =>
500532
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
501533
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
139139
case CreateTable(tableDesc, mode, Some(query))
140140
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
141141
DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
142-
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
142+
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output)
143143

144144
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
145145
parts, query, overwrite, false) if parts.isEmpty =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
157157

158158
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
159159
DDLUtils.checkDataColNames(tableDesc)
160-
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
160+
CreateHiveTableAsSelectCommand(tableDesc, query, query.output, mode)
161161

162162
case InsertIntoDir(isLocal, storage, provider, child, overwrite)
163163
if DDLUtils.isHiveTable(provider) =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ package org.apache.spark.sql.hive.execution
2020
import scala.util.control.NonFatal
2121

2222
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
23-
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2423
import org.apache.spark.sql.catalyst.catalog.CatalogTable
25-
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
26-
import org.apache.spark.sql.execution.command.RunnableCommand
24+
import org.apache.spark.sql.catalyst.expressions.Attribute
25+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
26+
import org.apache.spark.sql.execution.SparkPlan
27+
import org.apache.spark.sql.execution.command.DataWritingCommand
2728

2829

2930
/**
@@ -36,15 +37,15 @@ import org.apache.spark.sql.execution.command.RunnableCommand
3637
case class CreateHiveTableAsSelectCommand(
3738
tableDesc: CatalogTable,
3839
query: LogicalPlan,
40+
outputColumns: Seq[Attribute],
3941
mode: SaveMode)
40-
extends RunnableCommand {
42+
extends DataWritingCommand {
4143

4244
private val tableIdentifier = tableDesc.identifier
4345

44-
override def innerChildren: Seq[LogicalPlan] = Seq(query)
45-
46-
override def run(sparkSession: SparkSession): Seq[Row] = {
47-
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
46+
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
47+
val catalog = sparkSession.sessionState.catalog
48+
if (catalog.tableExists(tableIdentifier)) {
4849
assert(mode != SaveMode.Overwrite,
4950
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
5051

@@ -56,34 +57,36 @@ case class CreateHiveTableAsSelectCommand(
5657
return Seq.empty
5758
}
5859

59-
sparkSession.sessionState.executePlan(
60-
InsertIntoTable(
61-
UnresolvedRelation(tableIdentifier),
62-
Map(),
63-
query,
64-
overwrite = false,
65-
ifPartitionNotExists = false)).toRdd
60+
InsertIntoHiveTable(
61+
tableDesc,
62+
Map.empty,
63+
query,
64+
overwrite = false,
65+
ifPartitionNotExists = false,
66+
outputColumns = outputColumns).run(sparkSession, child)
6667
} else {
6768
// TODO ideally, we should get the output data ready first and then
6869
// add the relation into catalog, just in case of failure occurs while data
6970
// processing.
7071
assert(tableDesc.schema.isEmpty)
71-
sparkSession.sessionState.catalog.createTable(
72-
tableDesc.copy(schema = query.schema), ignoreIfExists = false)
72+
catalog.createTable(tableDesc.copy(schema = query.schema), ignoreIfExists = false)
7373

7474
try {
75-
sparkSession.sessionState.executePlan(
76-
InsertIntoTable(
77-
UnresolvedRelation(tableIdentifier),
78-
Map(),
79-
query,
80-
overwrite = true,
81-
ifPartitionNotExists = false)).toRdd
75+
// Read back the metadata of the table which was created just now.
76+
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
77+
// For CTAS, there is no static partition values to insert.
78+
val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap
79+
InsertIntoHiveTable(
80+
createdTableMeta,
81+
partition,
82+
query,
83+
overwrite = true,
84+
ifPartitionNotExists = false,
85+
outputColumns = outputColumns).run(sparkSession, child)
8286
} catch {
8387
case NonFatal(e) =>
8488
// drop the created table.
85-
sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true,
86-
purge = false)
89+
catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, purge = false)
8790
throw e
8891
}
8992
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -128,32 +128,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
128128
"src")
129129
}
130130

131-
test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") {
132-
withTempView("jt") {
133-
val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS()
134-
spark.read.json(ds).createOrReplaceTempView("jt")
135-
val outputs = sql(
136-
s"""
137-
|EXPLAIN EXTENDED
138-
|CREATE TABLE t1
139-
|AS
140-
|SELECT * FROM jt
141-
""".stripMargin).collect().map(_.mkString).mkString
142-
143-
val shouldContain =
144-
"== Parsed Logical Plan ==" :: "== Analyzed Logical Plan ==" :: "Subquery" ::
145-
"== Optimized Logical Plan ==" :: "== Physical Plan ==" ::
146-
"CreateHiveTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
147-
for (key <- shouldContain) {
148-
assert(outputs.contains(key), s"$key doesn't exist in result")
149-
}
150-
151-
val physicalIndex = outputs.indexOf("== Physical Plan ==")
152-
assert(outputs.substring(physicalIndex).contains("Subquery"),
153-
"Physical Plan should contain SubqueryAlias since the query should not be optimized")
154-
}
155-
}
156-
157131
test("explain output of physical plan should contain proper codegen stage ID") {
158132
checkKeywordsExist(sql(
159133
"""

0 commit comments

Comments
 (0)