Skip to content

[SPARK-26327][SQL] Bug fix for FileSourceScanExec metrics update and name changing #23277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,14 @@ case class FileSourceScanExec(
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf)

private var fileListingTime = 0L

@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000

metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
metrics("metadataTime").add(timeTakenMs)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics("numFiles") :: metrics("metadataTime") :: Nil)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking . Could you check which Spark versions have this bug? This code was introduced at Spark 2.2.0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All versions after 2.2.0 have this bug, this PR fix the conner case(but all case by sql) in the commit. If we test by:

spark.read.parquet($filepath)

the metrics show correctly cause relation.partitionSchemaOption is None, selectedPartitions initializing not triggered at wrong place. But if we test as the added UT:

sql("select * from table partition=xx")

the metrics goes wrong.
I changed the JIRA description and title more detailed, please help me to check whether I explain clearly. Thanks :)

fileListingTime = timeTakenMs
ret
}

Expand Down Expand Up @@ -291,6 +286,8 @@ case class FileSourceScanExec(
}

private lazy val inputRDD: RDD[InternalRow] = {
// Update metrics for taking effect in both code generation node and normal node.
updateDriverMetrics()
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
Expand All @@ -316,7 +313,7 @@ case class FileSourceScanExec(
override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
"metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"),
"fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"),
Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change this metric name? It has a clearer meaning, but the PR title is claiming only to change update location, not the naming.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, please fix PR description and title accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the pr description and title change done.

"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))

protected override def doExecute(): RDD[InternalRow] = {
Expand Down Expand Up @@ -507,6 +504,19 @@ case class FileSourceScanExec(
}
}

/**
* Send the updated metrics to driver, while this function calling, selectedPartitions has
* been initialized. See SPARK-26327 for more detail.
*/
private def updateDriverMetrics() = {
metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
metrics("fileListingTime").add(fileListingTime)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics("numFiles") :: metrics("fileListingTime") :: Nil)
}

override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,4 +636,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
assert(filters.head.metrics("numOutputRows").value == 1)
}
}

test("SPARK-26327: FileSourceScanExec metrics") {
withTable("testDataForScan") {
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").saveAsTable("testDataForScan")
// The execution plan only has 1 FileScan node.
val df = spark.sql(
"SELECT * FROM testDataForScan WHERE p = 1")
testSparkPlanMetrics(df, 1, Map(
0L -> (("Scan parquet default.testdataforscan", Map(
"number of output rows" -> 3L,
"number of files" -> 2L))))
)
}
}
}