Skip to content

Commit bd7df6b

Browse files
xuanyuankingcloud-fan
authored andcommitted
[SPARK-26327][SQL] Bug fix for FileSourceScanExec metrics update and name changing
## What changes were proposed in this pull request? As the description in [SPARK-26327](https://issues.apache.org/jira/browse/SPARK-26327), `postDriverMetricUpdates` was called on wrong place cause this bug, fix this by split the initializing of `selectedPartitions` and metrics updating logic. Add the updating logic in `inputRDD` initializing which can take effect in both code generation node and normal node. Also rename `metadataTime` to `fileListingTime` for clearer meaning. ## How was this patch tested? New test case in `SQLMetricsSuite`. Manual test: | | Before | After | |---------|:--------:|:-------:| | CodeGen |![image](https://user-images.githubusercontent.com/4833765/49741753-13c7e800-fcd2-11e8-97a8-8057b657aa3c.png)|![image](https://user-images.githubusercontent.com/4833765/49741774-1f1b1380-fcd2-11e8-98d9-78b950f4e43a.png)| | Normal |![image](https://user-images.githubusercontent.com/4833765/49741836-378b2e00-fcd2-11e8-80c3-ab462a6a3184.png)|![image](https://user-images.githubusercontent.com/4833765/49741860-4a056780-fcd2-11e8-9ef1-863de217f183.png)| Closes #23277 from xuanyuanking/SPARK-26327. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 4e1d859 commit bd7df6b

File tree

2 files changed

+34
-9
lines changed

2 files changed

+34
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,19 +167,14 @@ case class FileSourceScanExec(
167167
partitionSchema = relation.partitionSchema,
168168
relation.sparkSession.sessionState.conf)
169169

170+
private var fileListingTime = 0L
171+
170172
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
171173
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
172174
val startTime = System.nanoTime()
173175
val ret = relation.location.listFiles(partitionFilters, dataFilters)
174176
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
175-
176-
metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
177-
metrics("metadataTime").add(timeTakenMs)
178-
179-
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
180-
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
181-
metrics("numFiles") :: metrics("metadataTime") :: Nil)
182-
177+
fileListingTime = timeTakenMs
183178
ret
184179
}
185180

@@ -291,6 +286,8 @@ case class FileSourceScanExec(
291286
}
292287

293288
private lazy val inputRDD: RDD[InternalRow] = {
289+
// Update metrics for taking effect in both code generation node and normal node.
290+
updateDriverMetrics()
294291
val readFile: (PartitionedFile) => Iterator[InternalRow] =
295292
relation.fileFormat.buildReaderWithPartitionValues(
296293
sparkSession = relation.sparkSession,
@@ -316,7 +313,7 @@ case class FileSourceScanExec(
316313
override lazy val metrics =
317314
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
318315
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
319-
"metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"),
316+
"fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"),
320317
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
321318

322319
protected override def doExecute(): RDD[InternalRow] = {
@@ -507,6 +504,19 @@ case class FileSourceScanExec(
507504
}
508505
}
509506

507+
/**
508+
* Send the updated metrics to driver, while this function calling, selectedPartitions has
509+
* been initialized. See SPARK-26327 for more detail.
510+
*/
511+
private def updateDriverMetrics() = {
512+
metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
513+
metrics("fileListingTime").add(fileListingTime)
514+
515+
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
516+
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
517+
metrics("numFiles") :: metrics("fileListingTime") :: Nil)
518+
}
519+
510520
override def doCanonicalize(): FileSourceScanExec = {
511521
FileSourceScanExec(
512522
relation,

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,4 +636,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
636636
assert(filters.head.metrics("numOutputRows").value == 1)
637637
}
638638
}
639+
640+
test("SPARK-26327: FileSourceScanExec metrics") {
641+
withTable("testDataForScan") {
642+
spark.range(10).selectExpr("id", "id % 3 as p")
643+
.write.partitionBy("p").saveAsTable("testDataForScan")
644+
// The execution plan only has 1 FileScan node.
645+
val df = spark.sql(
646+
"SELECT * FROM testDataForScan WHERE p = 1")
647+
testSparkPlanMetrics(df, 1, Map(
648+
0L -> (("Scan parquet default.testdataforscan", Map(
649+
"number of output rows" -> 3L,
650+
"number of files" -> 2L))))
651+
)
652+
}
653+
}
639654
}

0 commit comments

Comments
 (0)