Skip to content

[SPARK-22977][SQL] fix web UI SQL tab for CTAS #20521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import java.net.URI

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -136,12 +138,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
case class CreateDataSourceTableAsSelectCommand(
table: CatalogTable,
mode: SaveMode,
query: LogicalPlan)
extends RunnableCommand {

override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
query: LogicalPlan,
outputColumns: Seq[Attribute])
extends DataWritingCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)

Expand All @@ -163,7 +164,7 @@ case class CreateDataSourceTableAsSelectCommand(
}

saveDataIntoTable(
sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
sparkSession, table, table.storage.locationUri, child, SaveMode.Append, tableExists = true)
} else {
assert(table.schema.isEmpty)

Expand All @@ -173,7 +174,7 @@ case class CreateDataSourceTableAsSelectCommand(
table.storage.locationUri
}
val result = saveDataIntoTable(
sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
sparkSession, table, tableLocation, child, SaveMode.Overwrite, tableExists = false)
val newTable = table.copy(
storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
Expand All @@ -198,10 +199,10 @@ case class CreateDataSourceTableAsSelectCommand(
session: SparkSession,
table: CatalogTable,
tableLocation: Option[URI],
data: LogicalPlan,
physicalPlan: SparkPlan,
mode: SaveMode,
tableExists: Boolean): BaseRelation = {
// Create the relation based on the input logical plan: `data`.
// Create the relation based on the input logical plan: `query`.
val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
val dataSource = DataSource(
session,
Expand All @@ -212,7 +213,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = if (tableExists) Some(table) else None)

try {
dataSource.writeAndRead(mode, query)
dataSource.writeAndRead(mode, query, outputColumns, physicalPlan)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
Expand Down Expand Up @@ -435,10 +437,11 @@ case class DataSource(
}

/**
* Writes the given [[LogicalPlan]] out in this [[FileFormat]].
* Creates a command node to write the given [[LogicalPlan]] out to the given [[FileFormat]].
* The returned command is unresolved and need to be analyzed.
*/
private def planForWritingFileFormat(
format: FileFormat, mode: SaveMode, data: LogicalPlan): LogicalPlan = {
format: FileFormat, mode: SaveMode, data: LogicalPlan): InsertIntoHadoopFsRelationCommand = {
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
Expand Down Expand Up @@ -482,9 +485,24 @@ case class DataSource(
/**
* Writes the given [[LogicalPlan]] out to this [[DataSource]] and returns a [[BaseRelation]] for
* the following reading.
*
* @param mode The save mode for this writing.
* @param data The input query plan that produces the data to be written. Note that this plan
* is analyzed and optimized.
* @param outputColumns The original output columns of the input query plan. The optimizer may not
* preserve the output column's names' case, so we need this parameter
* instead of `data.output`.
* @param physicalPlan The physical plan of the input query plan. We should run the writing
* command with this physical plan instead of creating a new physical plan,
* so that the metrics can be correctly linked to the given physical plan and
* shown in the web UI.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I think it's hacky to analyze/optimize/plan/execute a query during the execution of another query. Not only CTAS, other commands like CreateView, CacheTable etc. also have this issue. This is a surgical fix for Spark 2.3, so I didn't change this part and leave it for 2.4.

*/
def writeAndRead(mode: SaveMode, data: LogicalPlan): BaseRelation = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
def writeAndRead(
mode: SaveMode,
data: LogicalPlan,
outputColumns: Seq[Attribute],
physicalPlan: SparkPlan): BaseRelation = {
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}

Expand All @@ -493,9 +511,23 @@ case class DataSource(
dataSource.createRelation(
sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data))
case format: FileFormat =>
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd
val cmd = planForWritingFileFormat(format, mode, data)
val resolvedPartCols = cmd.partitionColumns.map { col =>
// The partition columns created in `planForWritingFileFormat` should always be
// `UnresolvedAttribute` with a single name part.
assert(col.isInstanceOf[UnresolvedAttribute])
val unresolved = col.asInstanceOf[UnresolvedAttribute]
assert(unresolved.nameParts.length == 1)
val name = unresolved.nameParts.head
outputColumns.find(a => equality(a.name, name)).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]")
}
}
val resolved = cmd.copy(partitionColumns = resolvedPartCols, outputColumns = outputColumns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ad-hoc column resolution just to ensure the names have the correct case after it is possibly dropped by the optimizer? Why does the command need to report these and where are they used?

Copy link
Contributor Author

@cloud-fan cloud-fan Feb 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code calls sparkSession.sessionState.executePlan to analze/optimize/plan/exeucte this temporary InsertIntoHadoopFsRelationCommand, which is pretty hacky because at this moment, we are executing CTAS and we already have the final physical plan. Here we manually analyze the InsertIntoHadoopFsRelationCommand so that we only reuse the physical part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the point of passing the physical plan, and I think that's a good idea. What I don't understand is why the command doesn't match the physical plan that is passed in. Is that physical plan based on a different logical plan? I would expect that the physical plan is created once and passed into run, but that it was created from the logical plan that is also passed in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The given physical plan has been registered to the UI, and we can collect its metrics if we execute it. However if we run sparkSession.sessionState.executePlan, we get a new physical plan which semantically equals to the given physical plan but not the same object. This new physical plan is not registered to UI so we can't show metrics correctly in the UI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you're answering the question that I'm asking. I understand why the physical plan is passed in.

Why does the physical plan not match the command that is produced, or why doesn't the command here match the physical plan? I don't see why executePlan would produce something different than what is passed in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the physical plan not match the command that is produced

It matches! The only problem is, they are 2 different JVM objects. The UI keeps the physical plan object and displays them. An alternative solution is to swap the new physical plan into the UI part, but that's hard to do with the current UI framework.

If we run sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd, we are executing the new physical plan, so no metrics will be reported to the passed-in physical plan and shown in the UI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it matches, then why is there column resolution happening?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

planForWritingFileFormat returns an unresolve InsertIntoHadoopFsRelationCommand. The previous code runs sparkSession.sessionState.executePlan to analze/.../exeucte the unresolved InsertIntoHadoopFsRelationCommand, which convert logical plan to physical plan and execute it.

Now, I wanna run the given physical plan. Even though sparkSession.sessionState.executePlan can produce an exactly same physical plan, they are different objects and UI is not happy with it, so I choose to manually resolve the unresolved InsertIntoHadoopFsRelationCommand here, and explicitly ask it to run my given physical plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I get it now. Thanks for explaining.

Why not pass the QueryExecution so you have access to the resolved plan without copying resolution rules here? I'm just curious here, I get that this is intended as a quick fix for the release, so don't let my comments block you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the CTAS node itself doesn't have a QueryExecution. It only has the physical plan that was produced by the planner. QueryExecution only exists in the places that drive the analze/.../exeucte, e.g. Dataset.

resolved.run(sparkSession, physicalPlan)
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation()
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
case CreateTable(tableDesc, mode, Some(query))
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output)

case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
parts, query, overwrite, false) if parts.isEmpty =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {

case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc)
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
CreateHiveTableAsSelectCommand(tableDesc, query, query.output, mode)

case InsertIntoDir(isLocal, storage, provider, child, overwrite)
if DDLUtils.isHiveTable(provider) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.sql.hive.execution
import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand


/**
Expand All @@ -36,15 +37,15 @@ import org.apache.spark.sql.execution.command.RunnableCommand
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumns: Seq[Attribute],
mode: SaveMode)
extends RunnableCommand {
extends DataWritingCommand {

private val tableIdentifier = tableDesc.identifier

override def innerChildren: Seq[LogicalPlan] = Seq(query)

override def run(sparkSession: SparkSession): Seq[Row] = {
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (catalog.tableExists(tableIdentifier)) {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")

Expand All @@ -56,34 +57,36 @@ case class CreateHiveTableAsSelectCommand(
return Seq.empty
}

sparkSession.sessionState.executePlan(
InsertIntoTable(
UnresolvedRelation(tableIdentifier),
Map(),
query,
overwrite = false,
ifPartitionNotExists = false)).toRdd
InsertIntoHiveTable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan this change from InsertIntoTable to InsertIntoHiveTable introduces a regression SPARK-25271, I'd like to revert it back to use InsertIntoTable, WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it affect web UI SQL tab?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch! I don't think we can revert here, as we need to execute the physical plan given as a parameter.

I think we should improve the hive table conversion optimizer rule, and handle CTAS as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I see. Thanks.

tableDesc,
Map.empty,
query,
overwrite = false,
ifPartitionNotExists = false,
outputColumns = outputColumns).run(sparkSession, child)
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
assert(tableDesc.schema.isEmpty)
sparkSession.sessionState.catalog.createTable(
tableDesc.copy(schema = query.schema), ignoreIfExists = false)
catalog.createTable(tableDesc.copy(schema = query.schema), ignoreIfExists = false)

try {
sparkSession.sessionState.executePlan(
InsertIntoTable(
UnresolvedRelation(tableIdentifier),
Map(),
query,
overwrite = true,
ifPartitionNotExists = false)).toRdd
// Read back the metadata of the table which was created just now.
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
// For CTAS, there is no static partition values to insert.
val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
createdTableMeta,
partition,
query,
overwrite = true,
ifPartitionNotExists = false,
outputColumns = outputColumns).run(sparkSession, child)
} catch {
case NonFatal(e) =>
// drop the created table.
sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true,
purge = false)
catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, purge = false)
throw e
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,32 +128,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
"src")
}

test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kinda a "bad" test. The bug was we optimize the CTAS input query twice, but here we are testing the if the EXPLAIN result of CTAS only contains analyzed query, which is specific to how we fix that bug at that time.

withTempView("jt") {
val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS()
spark.read.json(ds).createOrReplaceTempView("jt")
val outputs = sql(
s"""
|EXPLAIN EXTENDED
|CREATE TABLE t1
|AS
|SELECT * FROM jt
""".stripMargin).collect().map(_.mkString).mkString

val shouldContain =
"== Parsed Logical Plan ==" :: "== Analyzed Logical Plan ==" :: "Subquery" ::
"== Optimized Logical Plan ==" :: "== Physical Plan ==" ::
"CreateHiveTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
for (key <- shouldContain) {
assert(outputs.contains(key), s"$key doesn't exist in result")
}

val physicalIndex = outputs.indexOf("== Physical Plan ==")
assert(outputs.substring(physicalIndex).contains("Subquery"),
"Physical Plan should contain SubqueryAlias since the query should not be optimized")
}
}

test("explain output of physical plan should contain proper codegen stage ID") {
checkKeywordsExist(sql(
"""
Expand Down