-
Notifications
You must be signed in to change notification settings - Fork 247
chore: Add custom metric for native shuffle fetching batches from JVM #1108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
20aa89c
9f4aa82
397c5c4
4c29a99
f65e91f
c686f1d
b5b19b3
1fc34cd
9adcad8
5f86bc9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,9 @@ case class CometShuffleExchangeExec( | |
| SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) | ||
| override lazy val metrics: Map[String, SQLMetric] = Map( | ||
| "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), | ||
| "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric( | ||
| sparkContext, | ||
| "time fetching batches from JVM"), | ||
| "numPartitions" -> SQLMetrics.createMetric( | ||
| sparkContext, | ||
| "number of partitions")) ++ readMetrics ++ writeMetrics | ||
|
|
@@ -480,7 +483,14 @@ class CometShuffleWriteProcessor( | |
| "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), | ||
| "data_size" -> metrics("dataSize"), | ||
| "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) | ||
| val nativeMetrics = CometMetricNode(nativeSQLMetrics) | ||
|
|
||
| val nativeMetrics = if (metrics.contains("jvm_fetch_time")) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know why, but data_size is also repeated between metrics and nativeMetrics yet handled differently. Not super important though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, will look into that |
||
| CometMetricNode( | ||
| nativeSQLMetrics ++ Map("jvm_fetch_time" -> | ||
| metrics("jvm_fetch_time"))) | ||
| } else { | ||
| CometMetricNode(nativeSQLMetrics) | ||
| } | ||
|
|
||
| // Getting rid of the fake partitionId | ||
| val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so its a timer how long it takes to get new item from the stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, and in the case of ShuffleWriterExec, we know the input is always a ScanExec that is reading batches from the JVM