Skip to content

[SPARK-26193][SQL][Follow Up] Read metrics rename and display text changes #23286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[spark] class ShuffleMapTask(
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L

dep.shuffleWriterProcessor.writeProcess(rdd, dep, partitionId, context, partition)
dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
}

override def preferredLocations: Seq[TaskLocation] = preferredLocs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
* get from [[ShuffleManager]] and triggers rdd compute, finally return the [[MapStatus]] for
* this task.
*/
def writeProcess(
def write(
rdd: RDD[_],
dep: ShuffleDependency[_, _, _],
partitionId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Arrays
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter}

/**
* The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition
Expand Down Expand Up @@ -157,9 +157,9 @@ class ShuffledRowRDD(
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
// `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator,
// `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
// as well as the `tempMetrics` for basic shuffle metrics.
val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics)
val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
// The range of pre-shuffle partitions that we are fetching at here is
// [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
val reader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
Expand All @@ -50,7 +50,7 @@ case class ShuffleExchangeExec(
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
) ++ readMetrics ++ writeMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.metric.{SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}

/**
* Take the first `limit` elements and collect them to a single partition.
Expand All @@ -41,7 +41,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = readMetrics ++ writeMetrics
protected override def doExecute(): RDD[InternalRow] = {
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
Expand Down Expand Up @@ -165,7 +165,7 @@ case class TakeOrderedAndProjectExec(
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = readMetrics ++ writeMetrics

protected override def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ import org.apache.spark.shuffle.ShuffleWriteMetricsReporter
* @param metrics All metrics in current SparkPlan. This param should not empty and
* contains all shuffle metrics defined in createShuffleReadMetrics.
*/
private[spark] class SQLShuffleMetricsReporter(
class SQLShuffleReadMetricsReporter(
tempMetrics: TempShuffleReadMetrics,
metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
private[this] val _remoteBlocksFetched =
metrics(SQLShuffleMetricsReporter.REMOTE_BLOCKS_FETCHED)
metrics(SQLShuffleReadMetricsReporter.REMOTE_BLOCKS_FETCHED)
private[this] val _localBlocksFetched =
metrics(SQLShuffleMetricsReporter.LOCAL_BLOCKS_FETCHED)
metrics(SQLShuffleReadMetricsReporter.LOCAL_BLOCKS_FETCHED)
private[this] val _remoteBytesRead =
metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ)
metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ)
private[this] val _remoteBytesReadToDisk =
metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
private[this] val _localBytesRead =
metrics(SQLShuffleMetricsReporter.LOCAL_BYTES_READ)
metrics(SQLShuffleReadMetricsReporter.LOCAL_BYTES_READ)
private[this] val _fetchWaitTime =
metrics(SQLShuffleMetricsReporter.FETCH_WAIT_TIME)
metrics(SQLShuffleReadMetricsReporter.FETCH_WAIT_TIME)
private[this] val _recordsRead =
metrics(SQLShuffleMetricsReporter.RECORDS_READ)
metrics(SQLShuffleReadMetricsReporter.RECORDS_READ)

override def incRemoteBlocksFetched(v: Long): Unit = {
_remoteBlocksFetched.add(v)
Expand Down Expand Up @@ -75,7 +75,7 @@ private[spark] class SQLShuffleMetricsReporter(
}
}

private[spark] object SQLShuffleMetricsReporter {
object SQLShuffleReadMetricsReporter {
val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
val REMOTE_BYTES_READ = "remoteBytesRead"
Expand All @@ -88,8 +88,8 @@ private[spark] object SQLShuffleMetricsReporter {
* Create all shuffle read relative metrics and return the Map.
*/
def createShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks fetched"),
LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks fetched"),
REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks read"),
LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks read"),
REMOTE_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "remote bytes read"),
REMOTE_BYTES_READ_TO_DISK -> SQLMetrics.createSizeMetric(sc, "remote bytes read to disk"),
LOCAL_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "local bytes read"),
Expand All @@ -102,7 +102,7 @@ private[spark] object SQLShuffleMetricsReporter {
* @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
* @param metrics Shuffle write metrics in current SparkPlan.
*/
private[spark] class SQLShuffleWriteMetricsReporter(
class SQLShuffleWriteMetricsReporter(
metricsReporter: ShuffleWriteMetricsReporter,
metrics: Map[String, SQLMetric]) extends ShuffleWriteMetricsReporter {
private[this] val _bytesWritten =
Expand All @@ -112,29 +112,29 @@ private[spark] class SQLShuffleWriteMetricsReporter(
private[this] val _writeTime =
metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)

override private[spark] def incBytesWritten(v: Long): Unit = {
override def incBytesWritten(v: Long): Unit = {
metricsReporter.incBytesWritten(v)
_bytesWritten.add(v)
}
override private[spark] def decRecordsWritten(v: Long): Unit = {
override def decRecordsWritten(v: Long): Unit = {
metricsReporter.decBytesWritten(v)
_recordsWritten.set(_recordsWritten.value - v)
}
override private[spark] def incRecordsWritten(v: Long): Unit = {
override def incRecordsWritten(v: Long): Unit = {
metricsReporter.incRecordsWritten(v)
_recordsWritten.add(v)
}
override private[spark] def incWriteTime(v: Long): Unit = {
override def incWriteTime(v: Long): Unit = {
metricsReporter.incWriteTime(v)
_writeTime.add(v)
}
override private[spark] def decBytesWritten(v: Long): Unit = {
override def decBytesWritten(v: Long): Unit = {
metricsReporter.decBytesWritten(v)
_bytesWritten.set(_bytesWritten.value - v)
}
}

private[spark] object SQLShuffleWriteMetricsReporter {
object SQLShuffleWriteMetricsReporter {
val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import org.apache.spark.sql.types._
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
Expand Down Expand Up @@ -140,7 +140,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
new UnsafeRowSerializer(2))
val shuffled = new ShuffledRowRDD(
dependency,
SQLShuffleMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
shuffled.count()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
val shuffleExpected1 = Map(
"records read" -> 2L,
"local blocks fetched" -> 2L,
"remote blocks fetched" -> 0L,
"local blocks read" -> 2L,
"remote blocks read" -> 0L,
"shuffle records written" -> 2L)
testSparkPlanMetrics(df, 1, Map(
2L -> (("HashAggregate", expected1(0))),
Expand All @@ -114,8 +114,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
val shuffleExpected2 = Map(
"records read" -> 4L,
"local blocks fetched" -> 4L,
"remote blocks fetched" -> 0L,
"local blocks read" -> 4L,
"remote blocks read" -> 0L,
"shuffle records written" -> 4L)
testSparkPlanMetrics(df2, 1, Map(
2L -> (("HashAggregate", expected2(0))),
Expand Down Expand Up @@ -175,8 +175,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
1L -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L,
"local blocks fetched" -> 2L,
"remote blocks fetched" -> 0L))),
"local blocks read" -> 2L,
"remote blocks read" -> 0L))),
0L -> (("ObjectHashAggregate", Map("number of output rows" -> 1L))))
)

Expand All @@ -187,8 +187,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
1L -> (("Exchange", Map(
"shuffle records written" -> 4L,
"records read" -> 4L,
"local blocks fetched" -> 4L,
"remote blocks fetched" -> 0L))),
"local blocks read" -> 4L,
"remote blocks read" -> 0L))),
0L -> (("ObjectHashAggregate", Map("number of output rows" -> 3L))))
)
}
Expand Down Expand Up @@ -216,8 +216,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"number of output rows" -> 4L))),
2L -> (("Exchange", Map(
"records read" -> 4L,
"local blocks fetched" -> 2L,
"remote blocks fetched" -> 0L,
"local blocks read" -> 2L,
"remote blocks read" -> 0L,
"shuffle records written" -> 2L))))
)
}
Expand Down