-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-22977][SQL] fix web UI SQL tab for CTAS #20521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,8 +31,10 @@ import org.apache.spark.internal.Logging | |
import org.apache.spark.sql._ | ||
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute | ||
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} | ||
import org.apache.spark.sql.catalyst.expressions.Attribute | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | ||
import org.apache.spark.sql.execution.SparkPlan | ||
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat | ||
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider | ||
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat | ||
|
@@ -435,10 +437,11 @@ case class DataSource( | |
} | ||
|
||
/** | ||
* Writes the given [[LogicalPlan]] out in this [[FileFormat]]. | ||
* Creates a command node to write the given [[LogicalPlan]] out to the given [[FileFormat]]. | ||
* The returned command is unresolved and need to be analyzed. | ||
*/ | ||
private def planForWritingFileFormat( | ||
format: FileFormat, mode: SaveMode, data: LogicalPlan): LogicalPlan = { | ||
format: FileFormat, mode: SaveMode, data: LogicalPlan): InsertIntoHadoopFsRelationCommand = { | ||
// Don't glob path for the write path. The contracts here are: | ||
// 1. Only one output path can be specified on the write path; | ||
// 2. Output path must be a legal HDFS style file system path; | ||
|
@@ -482,9 +485,24 @@ case class DataSource( | |
/** | ||
* Writes the given [[LogicalPlan]] out to this [[DataSource]] and returns a [[BaseRelation]] for | ||
* the following reading. | ||
* | ||
* @param mode The save mode for this writing. | ||
* @param data The input query plan that produces the data to be written. Note that this plan | ||
* is analyzed and optimized. | ||
* @param outputColumns The original output columns of the input query plan. The optimizer may not | ||
* preserve the output column's names' case, so we need this parameter | ||
* instead of `data.output`. | ||
* @param physicalPlan The physical plan of the input query plan. We should run the writing | ||
* command with this physical plan instead of creating a new physical plan, | ||
* so that the metrics can be correctly linked to the given physical plan and | ||
* shown in the web UI. | ||
*/ | ||
def writeAndRead(mode: SaveMode, data: LogicalPlan): BaseRelation = { | ||
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { | ||
def writeAndRead( | ||
mode: SaveMode, | ||
data: LogicalPlan, | ||
outputColumns: Seq[Attribute], | ||
physicalPlan: SparkPlan): BaseRelation = { | ||
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { | ||
throw new AnalysisException("Cannot save interval data type into external storage.") | ||
} | ||
|
||
|
@@ -493,9 +511,23 @@ case class DataSource( | |
dataSource.createRelation( | ||
sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data)) | ||
case format: FileFormat => | ||
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd | ||
val cmd = planForWritingFileFormat(format, mode, data) | ||
val resolvedPartCols = cmd.partitionColumns.map { col => | ||
// The partition columns created in `planForWritingFileFormat` should always be | ||
// `UnresolvedAttribute` with a single name part. | ||
assert(col.isInstanceOf[UnresolvedAttribute]) | ||
val unresolved = col.asInstanceOf[UnresolvedAttribute] | ||
assert(unresolved.nameParts.length == 1) | ||
val name = unresolved.nameParts.head | ||
outputColumns.find(a => equality(a.name, name)).getOrElse { | ||
throw new AnalysisException( | ||
s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]") | ||
} | ||
} | ||
val resolved = cmd.copy(partitionColumns = resolvedPartCols, outputColumns = outputColumns) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this ad-hoc column resolution just to ensure the names have the correct case after it is possibly dropped by the optimizer? Why does the command need to report these and where are they used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The previous code calls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get the point of passing the physical plan, and I think that's a good idea. What I don't understand is why the command doesn't match the physical plan that is passed in. Is that physical plan based on a different logical plan? I would expect that the physical plan is created once and passed into run, but that it was created from the logical plan that is also passed in. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The given physical plan has been registered to the UI, and we can collect its metrics if we execute it. However if we run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you're answering the question that I'm asking. I understand why the physical plan is passed in. Why does the physical plan not match the command that is produced, or why doesn't the command here match the physical plan? I don't see why executePlan would produce something different than what is passed in. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It matches! The only problem is, they are 2 different JVM objects. The UI keeps the physical plan object and displays them. An alternative solution is to swap the new physical plan into the UI part, but that's hard to do with the current UI framework. If we run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it matches, then why is there column resolution happening? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Now, I wanna run the given physical plan. Even though There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I think I get it now. Thanks for explaining. Why not pass the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the CTAS node itself doesn't have a |
||
resolved.run(sparkSession, physicalPlan) | ||
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring | ||
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() | ||
copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation() | ||
case _ => | ||
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,10 +20,11 @@ package org.apache.spark.sql.hive.execution | |
import scala.util.control.NonFatal | ||
|
||
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} | ||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTable | ||
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} | ||
import org.apache.spark.sql.execution.command.RunnableCommand | ||
import org.apache.spark.sql.catalyst.expressions.Attribute | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.execution.SparkPlan | ||
import org.apache.spark.sql.execution.command.DataWritingCommand | ||
|
||
|
||
/** | ||
|
@@ -36,15 +37,15 @@ import org.apache.spark.sql.execution.command.RunnableCommand | |
case class CreateHiveTableAsSelectCommand( | ||
tableDesc: CatalogTable, | ||
query: LogicalPlan, | ||
outputColumns: Seq[Attribute], | ||
mode: SaveMode) | ||
extends RunnableCommand { | ||
extends DataWritingCommand { | ||
|
||
private val tableIdentifier = tableDesc.identifier | ||
|
||
override def innerChildren: Seq[LogicalPlan] = Seq(query) | ||
|
||
override def run(sparkSession: SparkSession): Seq[Row] = { | ||
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) { | ||
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { | ||
val catalog = sparkSession.sessionState.catalog | ||
if (catalog.tableExists(tableIdentifier)) { | ||
assert(mode != SaveMode.Overwrite, | ||
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite") | ||
|
||
|
@@ -56,34 +57,36 @@ case class CreateHiveTableAsSelectCommand( | |
return Seq.empty | ||
} | ||
|
||
sparkSession.sessionState.executePlan( | ||
InsertIntoTable( | ||
UnresolvedRelation(tableIdentifier), | ||
Map(), | ||
query, | ||
overwrite = false, | ||
ifPartitionNotExists = false)).toRdd | ||
InsertIntoHiveTable( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan this change from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will it affect web UI SQL tab? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah good catch! I don't think we can revert here, as we need to execute the physical plan given as a parameter. I think we should improve the hive table conversion optimizer rule, and handle CTAS as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. I see. Thanks. |
||
tableDesc, | ||
Map.empty, | ||
query, | ||
overwrite = false, | ||
ifPartitionNotExists = false, | ||
outputColumns = outputColumns).run(sparkSession, child) | ||
} else { | ||
// TODO ideally, we should get the output data ready first and then | ||
// add the relation into catalog, just in case of failure occurs while data | ||
// processing. | ||
assert(tableDesc.schema.isEmpty) | ||
sparkSession.sessionState.catalog.createTable( | ||
tableDesc.copy(schema = query.schema), ignoreIfExists = false) | ||
catalog.createTable(tableDesc.copy(schema = query.schema), ignoreIfExists = false) | ||
|
||
try { | ||
sparkSession.sessionState.executePlan( | ||
InsertIntoTable( | ||
UnresolvedRelation(tableIdentifier), | ||
Map(), | ||
query, | ||
overwrite = true, | ||
ifPartitionNotExists = false)).toRdd | ||
// Read back the metadata of the table which was created just now. | ||
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier) | ||
// For CTAS, there is no static partition values to insert. | ||
val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap | ||
InsertIntoHiveTable( | ||
createdTableMeta, | ||
partition, | ||
query, | ||
overwrite = true, | ||
ifPartitionNotExists = false, | ||
outputColumns = outputColumns).run(sparkSession, child) | ||
} catch { | ||
case NonFatal(e) => | ||
// drop the created table. | ||
sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, | ||
purge = false) | ||
catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, purge = false) | ||
throw e | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -128,32 +128,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto | |
"src") | ||
} | ||
|
||
test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is kinda a "bad" test. The bug was we optimize the CTAS input query twice, but here we are testing the if the EXPLAIN result of CTAS only contains analyzed query, which is specific to how we fix that bug at that time. |
||
withTempView("jt") { | ||
val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS() | ||
spark.read.json(ds).createOrReplaceTempView("jt") | ||
val outputs = sql( | ||
s""" | ||
|EXPLAIN EXTENDED | ||
|CREATE TABLE t1 | ||
|AS | ||
|SELECT * FROM jt | ||
""".stripMargin).collect().map(_.mkString).mkString | ||
|
||
val shouldContain = | ||
"== Parsed Logical Plan ==" :: "== Analyzed Logical Plan ==" :: "Subquery" :: | ||
"== Optimized Logical Plan ==" :: "== Physical Plan ==" :: | ||
"CreateHiveTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil | ||
for (key <- shouldContain) { | ||
assert(outputs.contains(key), s"$key doesn't exist in result") | ||
} | ||
|
||
val physicalIndex = outputs.indexOf("== Physical Plan ==") | ||
assert(outputs.substring(physicalIndex).contains("Subquery"), | ||
"Physical Plan should contain SubqueryAlias since the query should not be optimized") | ||
} | ||
} | ||
|
||
test("explain output of physical plan should contain proper codegen stage ID") { | ||
checkKeywordsExist(sql( | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally I think it's hacky to analyze/optimize/plan/execute a query during the execution of another query. Not only CTAS, other commands like
CreateView
,CacheTable
etc. also have this issue. This is a surgical fix for Spark 2.3, so I didn't change this part and leave it for 2.4.