Skip to content

Commit 589ea26

Browse files
committed
Add metrics for all join and aggregate operators
1 parent 11caf1c commit 589ea26

File tree

14 files changed

+683
-37
lines changed

14 files changed

+683
-37
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2525
import org.apache.spark.sql.catalyst.errors._
2626
import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.catalyst.plans.physical._
28+
import org.apache.spark.sql.metric.SQLMetrics
2829

2930
/**
3031
* :: DeveloperApi ::
@@ -45,6 +46,10 @@ case class Aggregate(
4546
child: SparkPlan)
4647
extends UnaryNode {
4748

49+
override private[sql] lazy val metrics = Map(
50+
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
51+
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
52+
4853
override def requiredChildDistribution: List[Distribution] = {
4954
if (partial) {
5055
UnspecifiedDistribution :: Nil
@@ -121,12 +126,15 @@ case class Aggregate(
121126
}
122127

123128
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
129+
val numInputRows = longMetric("numInputRows")
130+
val numOutputRows = longMetric("numOutputRows")
124131
if (groupingExpressions.isEmpty) {
125132
child.execute().mapPartitions { iter =>
126133
val buffer = newAggregateBuffer()
127134
var currentRow: InternalRow = null
128135
while (iter.hasNext) {
129136
currentRow = iter.next()
137+
numInputRows += 1
130138
var i = 0
131139
while (i < buffer.length) {
132140
buffer(i).update(currentRow)
@@ -142,6 +150,7 @@ case class Aggregate(
142150
i += 1
143151
}
144152

153+
numOutputRows += 1
145154
Iterator(resultProjection(aggregateResults))
146155
}
147156
} else {
@@ -152,6 +161,7 @@ case class Aggregate(
152161
var currentRow: InternalRow = null
153162
while (iter.hasNext) {
154163
currentRow = iter.next()
164+
numInputRows += 1
155165
val currentGroup = groupingProjection(currentRow)
156166
var currentBuffer = hashTable.get(currentGroup)
157167
if (currentBuffer == null) {
@@ -180,6 +190,7 @@ case class Aggregate(
180190
val currentEntry = hashTableIter.next()
181191
val currentGroup = currentEntry.getKey
182192
val currentBuffer = currentEntry.getValue
193+
numOutputRows += 1
183194

184195
var i = 0
185196
while (i < currentBuffer.length) {

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.aggregate._
2525
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, ClusteredDistribution, AllTuples, Distribution}
2626
import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, SparkPlan, UnaryNode}
27+
import org.apache.spark.sql.metric.SQLMetrics
2728
import org.apache.spark.sql.types.StructType
2829

2930
case class SortBasedAggregate(
@@ -38,6 +39,10 @@ case class SortBasedAggregate(
3839
child: SparkPlan)
3940
extends UnaryNode {
4041

42+
override private[sql] lazy val metrics = Map(
43+
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
44+
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
45+
4146
override def outputsUnsafeRows: Boolean = false
4247

4348
override def canProcessUnsafeRows: Boolean = false
@@ -63,7 +68,13 @@ case class SortBasedAggregate(
6368
}
6469

6570
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
66-
child.execute().mapPartitions { iter =>
71+
val numInputRows = longMetric("numInputRows")
72+
val numOutputRows = longMetric("numOutputRows")
73+
child.execute().mapPartitions { _iter =>
74+
val iter = _iter.map { row =>
75+
numInputRows += 1
76+
row
77+
}
6778
// Because the constructor of an aggregation iterator will read at least the first row,
6879
// we need to get the value of iter.hasNext first.
6980
val hasInput = iter.hasNext
@@ -88,9 +99,13 @@ case class SortBasedAggregate(
8899
if (!hasInput && groupingExpressions.isEmpty) {
89100
// There is no input and there is no grouping expressions.
90101
// We need to output a single row as the output.
102+
numOutputRows += 1
91103
Iterator[InternalRow](outputIter.outputForEmptyGroupingKeyWithoutInput())
92104
} else {
93-
outputIter
105+
outputIter.map { row =>
106+
numOutputRows += 1
107+
row
108+
}
94109
}
95110
}
96111
}

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
2424
import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, ClusteredDistribution, AllTuples, Distribution}
2626
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
27+
import org.apache.spark.sql.metric.SQLMetrics
2728

2829
case class TungstenAggregate(
2930
requiredChildDistributionExpressions: Option[Seq[Expression]],
@@ -35,6 +36,10 @@ case class TungstenAggregate(
3536
child: SparkPlan)
3637
extends UnaryNode {
3738

39+
override private[sql] lazy val metrics = Map(
40+
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
41+
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
42+
3843
override def outputsUnsafeRows: Boolean = true
3944

4045
override def canProcessUnsafeRows: Boolean = true
@@ -61,7 +66,13 @@ case class TungstenAggregate(
6166
}
6267

6368
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
64-
child.execute().mapPartitions { iter =>
69+
val numInputRows = longMetric("numInputRows")
70+
val numOutputRows = longMetric("numOutputRows")
71+
child.execute().mapPartitions { _iter =>
72+
val iter = _iter.map { row =>
73+
numInputRows += 1
74+
row
75+
}
6576
val hasInput = iter.hasNext
6677
if (!hasInput && groupingExpressions.nonEmpty) {
6778
// This is a grouped aggregate and the input iterator is empty,
@@ -81,9 +92,13 @@ case class TungstenAggregate(
8192
testFallbackStartsAt)
8293

8394
if (!hasInput && groupingExpressions.isEmpty) {
95+
numOutputRows += 1
8496
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
8597
} else {
86-
aggregationIterator
98+
aggregationIterator.map { row =>
99+
numOutputRows += 1
100+
row
101+
}
87102
}
88103
}
89104
}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.expressions.Expression
2727
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
2828
import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
29+
import org.apache.spark.sql.metric.SQLMetrics
2930
import org.apache.spark.util.ThreadUtils
3031
import org.apache.spark.{InternalAccumulator, TaskContext}
3132

@@ -45,7 +46,10 @@ case class BroadcastHashJoin(
4546
right: SparkPlan)
4647
extends BinaryNode with HashJoin {
4748

48-
override protected[sql] val trackNumOfRowsEnabled = true
49+
override private[sql] lazy val metrics = Map(
50+
"numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
51+
"numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
52+
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
4953

5054
val timeout: Duration = {
5155
val timeoutValue = sqlContext.conf.broadcastTimeout
@@ -65,6 +69,11 @@ case class BroadcastHashJoin(
6569
// for the same query.
6670
@transient
6771
private lazy val broadcastFuture = {
72+
val numBuildRows = buildSide match {
73+
case BuildLeft => longMetric("numLeftRows")
74+
case BuildRight => longMetric("numRightRows")
75+
}
76+
6877
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
6978
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
7079
future {
@@ -73,7 +82,10 @@ case class BroadcastHashJoin(
7382
SQLExecution.withExecutionId(sparkContext, executionId) {
7483
// Note that we use .execute().collect() because we don't want to convert data to Scala
7584
// types
76-
val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
85+
val input: Array[InternalRow] = buildPlan.execute().map { row =>
86+
numBuildRows += 1
87+
row.copy()
88+
}.collect()
7789
val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, input.size)
7890
sparkContext.broadcast(hashed)
7991
}
@@ -85,6 +97,12 @@ case class BroadcastHashJoin(
8597
}
8698

8799
protected override def doExecute(): RDD[InternalRow] = {
100+
val numStreamedRows = buildSide match {
101+
case BuildLeft => longMetric("numRightRows")
102+
case BuildRight => longMetric("numLeftRows")
103+
}
104+
val numOutputRows = longMetric("numOutputRows")
105+
88106
val broadcastRelation = Await.result(broadcastFuture, timeout)
89107

90108
streamedPlan.execute().mapPartitions { streamedIter =>
@@ -95,7 +113,13 @@ case class BroadcastHashJoin(
95113
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
96114
case _ =>
97115
}
98-
hashJoin(streamedIter, hashedRelation)
116+
hashJoin(streamedIter.map { row =>
117+
numStreamedRows += 1
118+
row
119+
}, hashedRelation).map { row =>
120+
numOutputRows += 1
121+
row
122+
}
99123
}
100124
}
101125
}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
2828
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
2929
import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
30+
import org.apache.spark.sql.metric.SQLMetrics
3031
import org.apache.spark.{InternalAccumulator, TaskContext}
3132

3233
/**
@@ -45,6 +46,11 @@ case class BroadcastHashOuterJoin(
4546
left: SparkPlan,
4647
right: SparkPlan) extends BinaryNode with HashOuterJoin {
4748

49+
override private[sql] lazy val metrics = Map(
50+
"numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
51+
"numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
52+
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
53+
4854
val timeout = {
4955
val timeoutValue = sqlContext.conf.broadcastTimeout
5056
if (timeoutValue < 0) {
@@ -63,6 +69,14 @@ case class BroadcastHashOuterJoin(
6369
// for the same query.
6470
@transient
6571
private lazy val broadcastFuture = {
72+
val numBuildRows = joinType match {
73+
case RightOuter => longMetric("numLeftRows")
74+
case LeftOuter => longMetric("numRightRows")
75+
case x =>
76+
throw new IllegalArgumentException(
77+
s"HashOuterJoin should not take $x as the JoinType")
78+
}
79+
6680
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
6781
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
6882
future {
@@ -71,7 +85,10 @@ case class BroadcastHashOuterJoin(
7185
SQLExecution.withExecutionId(sparkContext, executionId) {
7286
// Note that we use .execute().collect() because we don't want to convert data to Scala
7387
// types
74-
val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
88+
val input: Array[InternalRow] = buildPlan.execute().map { row =>
89+
numBuildRows += 1
90+
row.copy()
91+
}.collect()
7592
val hashed = HashedRelation(input.iterator, buildKeyGenerator, input.size)
7693
sparkContext.broadcast(hashed)
7794
}
@@ -83,9 +100,22 @@ case class BroadcastHashOuterJoin(
83100
}
84101

85102
override def doExecute(): RDD[InternalRow] = {
103+
val numStreamedRows = joinType match {
104+
case RightOuter => longMetric("numRightRows")
105+
case LeftOuter => longMetric("numLeftRows")
106+
case x =>
107+
throw new IllegalArgumentException(
108+
s"HashOuterJoin should not take $x as the JoinType")
109+
}
110+
val numOutputRows = longMetric("numOutputRows")
111+
86112
val broadcastRelation = Await.result(broadcastFuture, timeout)
87113

88-
streamedPlan.execute().mapPartitions { streamedIter =>
114+
streamedPlan.execute().mapPartitions { _streamedIter =>
115+
val streamedIter = _streamedIter.map { row =>
116+
numStreamedRows += 1
117+
row
118+
}
89119
val joinedRow = new JoinedRow()
90120
val hashTable = broadcastRelation.value
91121
val keyGenerator = streamedKeyGenerator
@@ -117,6 +147,9 @@ case class BroadcastHashOuterJoin(
117147
throw new IllegalArgumentException(
118148
s"BroadcastHashOuterJoin should not take $x as the JoinType")
119149
}
150+
}.map { row =>
151+
numOutputRows += 1
152+
row
120153
}
121154
}
122155
}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
26+
import org.apache.spark.sql.metric.SQLMetrics
2627

2728
/**
2829
* :: DeveloperApi ::
@@ -37,29 +38,55 @@ case class BroadcastLeftSemiJoinHash(
3738
right: SparkPlan,
3839
condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
3940

41+
override private[sql] lazy val metrics = Map(
42+
"numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
43+
"numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
44+
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
45+
4046
protected override def doExecute(): RDD[InternalRow] = {
41-
val input = right.execute().map(_.copy()).collect()
47+
val numLeftRows = longMetric("numLeftRows")
48+
val numRightRows = longMetric("numRightRows")
49+
val numOutputRows = longMetric("numOutputRows")
50+
51+
val input = right.execute().map { row =>
52+
numRightRows += 1
53+
row.copy()
54+
}.collect()
4255

4356
if (condition.isEmpty) {
4457
val hashSet = buildKeyHashSet(input.toIterator)
4558
val broadcastedRelation = sparkContext.broadcast(hashSet)
4659

47-
left.execute().mapPartitions { streamIter =>
48-
hashSemiJoin(streamIter, broadcastedRelation.value)
60+
left.execute().mapPartitions { _streamIter =>
61+
val streamIter = _streamIter.map { row =>
62+
numLeftRows += 1
63+
row
64+
}
65+
hashSemiJoin(streamIter, broadcastedRelation.value).map { row =>
66+
numOutputRows += 1
67+
row
68+
}
4969
}
5070
} else {
5171
val hashRelation = HashedRelation(input.toIterator, rightKeyGenerator, input.size)
5272
val broadcastedRelation = sparkContext.broadcast(hashRelation)
5373

54-
left.execute().mapPartitions { streamIter =>
74+
left.execute().mapPartitions { _streamIter =>
75+
val streamIter = _streamIter.map { row =>
76+
numLeftRows += 1
77+
row
78+
}
5579
val hashedRelation = broadcastedRelation.value
5680
hashedRelation match {
5781
case unsafe: UnsafeHashedRelation =>
5882
TaskContext.get().internalMetricsToAccumulators(
5983
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
6084
case _ =>
6185
}
62-
hashSemiJoin(streamIter, hashedRelation)
86+
hashSemiJoin(streamIter, hashedRelation).map { row =>
87+
numOutputRows += 1
88+
row
89+
}
6390
}
6491
}
6592
}

0 commit comments

Comments
 (0)