Skip to content

Commit 401e270

Browse files
AngersZhuuuucloud-fan
authored andcommitted
[SPARK-34567][SQL] CreateTableAsSelect should update metrics too
### What changes were proposed in this pull request? For command `CreateTableAsSelect` we use `InsertIntoHiveTable`, `InsertIntoHadoopFsRelationCommand` to insert data. We will update metrics of `InsertIntoHiveTable`, `InsertIntoHadoopFsRelationCommand` in `FileFormatWriter.write()`, but we only show CreateTableAsSelectCommand in WebUI SQL Tab. We need to update `CreateTableAsSelectCommand`'s metrics too. Before this PR: ![image](https://user-images.githubusercontent.com/46485123/109411226-81f44480-79db-11eb-99cb-b9686b15bf61.png) After this PR: ![image](https://user-images.githubusercontent.com/46485123/109411232-8ae51600-79db-11eb-9111-3bea0bc2d475.png) ![image](https://user-images.githubusercontent.com/46485123/109905192-62aa2f80-7cd9-11eb-91f9-04b16c9238ae.png) ### Why are the changes needed? Complete SQL Metrics ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? <!-- MT Closes #31679 from AngersZhuuuu/SPARK-34567. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 2b1c170 commit 401e270

File tree

6 files changed

+76
-4
lines changed

6 files changed

+76
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.command
1919

2020
import org.apache.hadoop.conf.Configuration
2121

22+
import org.apache.spark.SparkContext
2223
import org.apache.spark.sql.{Row, SparkSession}
2324
import org.apache.spark.sql.catalyst.expressions.Attribute
2425
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
25-
import org.apache.spark.sql.execution.SparkPlan
26+
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
2627
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
27-
import org.apache.spark.sql.execution.metric.SQLMetric
28+
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
2829
import org.apache.spark.util.SerializableConfiguration
2930

3031
/**
@@ -73,4 +74,26 @@ object DataWritingCommand {
7374
attr.withName(outputName)
7475
}
7576
}
77+
78+
/**
79+
* When execute CTAS operators, Spark will use [[InsertIntoHadoopFsRelationCommand]]
80+
* or [[InsertIntoHiveTable]] command to write data, they both inherit metrics from
81+
* [[DataWritingCommand]], but after running [[InsertIntoHadoopFsRelationCommand]]
82+
* or [[InsertIntoHiveTable]], we only update metrics in these two command through
83+
* [[BasicWriteJobStatsTracker]], we also need to propogate metrics to the command
84+
* that actually calls [[InsertIntoHadoopFsRelationCommand]] or [[InsertIntoHiveTable]].
85+
*
86+
* @param sparkContext Current SparkContext.
87+
* @param command Command to execute writing data.
88+
* @param metrics Metrics of real DataWritingCommand.
89+
*/
90+
def propogateMetrics(
91+
sparkContext: SparkContext,
92+
command: DataWritingCommand,
93+
metrics: Map[String, SQLMetric]): Unit = {
94+
command.metrics.foreach { case (key, metric) => metrics(key).set(metric.value) }
95+
SQLMetrics.postDriverMetricUpdates(sparkContext,
96+
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
97+
metrics.values.toSeq)
98+
}
7699
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ case class CreateDataSourceTableAsSelectCommand(
220220
catalogTable = if (tableExists) Some(table) else None)
221221

222222
try {
223-
dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan)
223+
dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan, metrics)
224224
} catch {
225225
case ex: AnalysisException =>
226226
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
4343
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
4444
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
4545
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
46+
import org.apache.spark.sql.execution.metric.SQLMetric
4647
import org.apache.spark.sql.execution.streaming._
4748
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
4849
import org.apache.spark.sql.internal.SQLConf
@@ -518,7 +519,8 @@ case class DataSource(
518519
mode: SaveMode,
519520
data: LogicalPlan,
520521
outputColumnNames: Seq[String],
521-
physicalPlan: SparkPlan): BaseRelation = {
522+
physicalPlan: SparkPlan,
523+
metrics: Map[String, SQLMetric]): BaseRelation = {
522524
val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames)
523525
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
524526
throw new AnalysisException("Cannot save interval data type into external storage.")
@@ -546,6 +548,7 @@ case class DataSource(
546548
partitionColumns = resolvedPartCols,
547549
outputColumnNames = outputColumnNames)
548550
resolved.run(sparkSession, physicalPlan)
551+
DataWritingCommand.propogateMetrics(sparkSession.sparkContext, resolved, metrics)
549552
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
550553
copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation()
551554
case _ =>

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
2828
import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, WholeStageCodegenExec}
2929
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
3030
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
31+
import org.apache.spark.sql.execution.command.DataWritingCommandExec
3132
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
3233
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
3334
import org.apache.spark.sql.functions._
@@ -782,4 +783,20 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
782783
}
783784
}
784785
}
786+
787+
test("SPARK-34567: Add metrics for CTAS operator") {
788+
withTable("t") {
789+
val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a")
790+
val dataWritingCommandExec =
791+
df.queryExecution.executedPlan.asInstanceOf[DataWritingCommandExec]
792+
dataWritingCommandExec.executeCollect()
793+
val createTableAsSelect = dataWritingCommandExec.cmd
794+
assert(createTableAsSelect.metrics.contains("numFiles"))
795+
assert(createTableAsSelect.metrics("numFiles").value == 1)
796+
assert(createTableAsSelect.metrics.contains("numOutputBytes"))
797+
assert(createTableAsSelect.metrics("numOutputBytes").value > 0)
798+
assert(createTableAsSelect.metrics.contains("numOutputRows"))
799+
assert(createTableAsSelect.metrics("numOutputRows").value == 1)
800+
}
801+
}
785802
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {
5555

5656
val command = getWritingCommand(catalog, tableDesc, tableExists = true)
5757
command.run(sparkSession, child)
58+
DataWritingCommand.propogateMetrics(sparkSession.sparkContext, command, metrics)
5859
} else {
5960
// TODO ideally, we should get the output data ready first and then
6061
// add the relation into catalog, just in case of failure occurs while data
@@ -69,6 +70,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {
6970
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
7071
val command = getWritingCommand(catalog, createdTableMeta, tableExists = false)
7172
command.run(sparkSession, child)
73+
DataWritingCommand.propogateMetrics(sparkSession.sparkContext, command, metrics)
7274
} catch {
7375
case NonFatal(e) =>
7476
// drop the created table.

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.spark.sql.hive.execution
1919

2020
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
21+
import org.apache.spark.sql.execution.command.DataWritingCommandExec
2122
import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils
23+
import org.apache.spark.sql.hive.HiveUtils
2224
import org.apache.spark.sql.hive.test.TestHiveSingleton
2325

2426
// Disable AQE because metric info is different with AQE on/off
@@ -34,4 +36,29 @@ class SQLMetricsSuite extends SQLMetricsTestUtils with TestHiveSingleton
3436
testMetricsDynamicPartition("hive", "hive", "t1")
3537
}
3638
}
39+
40+
test("SPARK-34567: Add metrics for CTAS operator") {
41+
Seq(false, true).foreach { canOptimized =>
42+
withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> canOptimized.toString) {
43+
withTable("t") {
44+
val df = sql(s"CREATE TABLE t STORED AS PARQUET AS SELECT 1 as a")
45+
val dataWritingCommandExec =
46+
df.queryExecution.executedPlan.asInstanceOf[DataWritingCommandExec]
47+
dataWritingCommandExec.executeCollect()
48+
val createTableAsSelect = dataWritingCommandExec.cmd
49+
if (canOptimized) {
50+
assert(createTableAsSelect.isInstanceOf[OptimizedCreateHiveTableAsSelectCommand])
51+
} else {
52+
assert(createTableAsSelect.isInstanceOf[CreateHiveTableAsSelectCommand])
53+
}
54+
assert(createTableAsSelect.metrics.contains("numFiles"))
55+
assert(createTableAsSelect.metrics("numFiles").value == 1)
56+
assert(createTableAsSelect.metrics.contains("numOutputBytes"))
57+
assert(createTableAsSelect.metrics("numOutputBytes").value > 0)
58+
assert(createTableAsSelect.metrics.contains("numOutputRows"))
59+
assert(createTableAsSelect.metrics("numOutputRows").value == 1)
60+
}
61+
}
62+
}
63+
}
3764
}

0 commit comments

Comments
 (0)