-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26327][SQL] Bug fix for FileSourceScanExec
metrics update and name changing
#23277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -167,19 +167,14 @@ case class FileSourceScanExec( | |
partitionSchema = relation.partitionSchema, | ||
relation.sparkSession.sessionState.conf) | ||
|
||
private var fileListingTime = 0L | ||
|
||
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { | ||
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) | ||
val startTime = System.nanoTime() | ||
val ret = relation.location.listFiles(partitionFilters, dataFilters) | ||
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 | ||
|
||
metrics("numFiles").add(ret.map(_.files.size.toLong).sum) | ||
metrics("metadataTime").add(timeTakenMs) | ||
|
||
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, | ||
metrics("numFiles") :: metrics("metadataTime") :: Nil) | ||
|
||
fileListingTime = timeTakenMs | ||
ret | ||
} | ||
|
||
|
@@ -291,6 +286,8 @@ case class FileSourceScanExec( | |
} | ||
|
||
private lazy val inputRDD: RDD[InternalRow] = { | ||
// Update metrics for taking effect in both code generation node and normal node. | ||
updateDriverMetrics() | ||
val readFile: (PartitionedFile) => Iterator[InternalRow] = | ||
relation.fileFormat.buildReaderWithPartitionValues( | ||
sparkSession = relation.sparkSession, | ||
|
@@ -316,7 +313,7 @@ case class FileSourceScanExec( | |
override lazy val metrics = | ||
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), | ||
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"), | ||
"metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"), | ||
"fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we change this metric name? It has a clearer meaning, but the PR title is claiming only to change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, please fix PR description and title accordingly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, the pr description and title change done. |
||
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) | ||
|
||
protected override def doExecute(): RDD[InternalRow] = { | ||
|
@@ -507,6 +504,19 @@ case class FileSourceScanExec( | |
} | ||
} | ||
|
||
/** | ||
* Send the updated metrics to driver, while this function calling, selectedPartitions has | ||
* been initialized. See SPARK-26327 for more detail. | ||
*/ | ||
private def updateDriverMetrics() = { | ||
metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum) | ||
metrics("fileListingTime").add(fileListingTime) | ||
|
||
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, | ||
metrics("numFiles") :: metrics("fileListingTime") :: Nil) | ||
} | ||
|
||
override def doCanonicalize(): FileSourceScanExec = { | ||
FileSourceScanExec( | ||
relation, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking . Could you check which Spark versions have this bug? This code was introduced at Spark 2.2.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All versions after 2.2.0 have this bug, this PR fix the conner case(but all case by sql) in the commit. If we test by:
the metrics show correctly cause
relation.partitionSchemaOption
is None,selectedPartitions
initializing not triggered at wrong place. But if we test as the added UT:the metrics goes wrong.
I changed the JIRA description and title more detailed, please help me to check whether I explain clearly. Thanks :)