Skip to content

[SPARK-26327][SQL][BACKPORT-2.3] Bug fix for FileSourceScanExec metrics update #23299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,14 @@ case class FileSourceScanExec(
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf)

private var metadataTime = 0L

@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000

metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
metrics("metadataTime").add(timeTakenMs)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics("numFiles") :: metrics("metadataTime") :: Nil)

metadataTime = timeTakenMs
ret
}

Expand Down Expand Up @@ -293,6 +288,8 @@ case class FileSourceScanExec(
}

private lazy val inputRDD: RDD[InternalRow] = {
// Update metrics for taking effect in both code generation node and normal node.
updateDriverMetrics()
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
Expand Down Expand Up @@ -500,6 +497,19 @@ case class FileSourceScanExec(
}
}

/**
* Send the updated metrics to driver, while this function calling, selectedPartitions has
* been initialized. See SPARK-26327 for more detail.
*/
private def updateDriverMetrics() = {
metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
metrics("metadataTime").add(metadataTime)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics("numFiles") :: metrics("metadataTime") :: Nil)
}

override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,4 +504,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
test("writing data out metrics with dynamic partition: parquet") {
testMetricsDynamicPartition("parquet", "parquet", "t1")
}

test("SPARK-26327: FileSourceScanExec metrics") {
withTable("testDataForScan") {
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").saveAsTable("testDataForScan")
// The execution plan only has 1 FileScan node.
val df = spark.sql(
"SELECT * FROM testDataForScan WHERE p = 1")
testSparkPlanMetrics(df, 1, Map(
0L -> (("Scan parquet default.testdataforscan", Map(
"number of output rows" -> 3L,
"number of files" -> 2L))))
)
}
}
}