Skip to content

Commit a6c8e18

Browse files
yugan95Yu Gan
authored andcommitted
apache#120 shuffle records read (apache#127)
Co-authored-by: Yu Gan <yu.gan@kyligence.io>
1 parent 85cf38e commit a6c8e18

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,13 @@ class SQLAppStatusListener(
239239
return
240240
}
241241

242-
val updates = accumUpdates
242+
val updates = if (skewDetectEnabled) {
243+
import InternalAccumulator._
244+
accumUpdates.filter { acc =>
245+
acc.update.isDefined && (metrics.accumulatorIds.contains(acc.id) ||
246+
shuffleRead.RECORDS_READ.equals(acc.name.orNull))
247+
}.sortBy(_.id)
248+
} else accumUpdates
243249
.filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
244250
.sortBy(_.id)
245251

@@ -252,7 +258,7 @@ class SQLAppStatusListener(
252258
val values = new Array[Long](updates.size)
253259
updates.zipWithIndex.foreach { case (acc, idx) =>
254260
ids(idx) = acc.id
255-
names(idx) = acc.name.get
261+
names(idx) = acc.name.orNull
256262
// In a live application, accumulators have Long values, but when reading from event
257263
// logs, they have String values. For now, assume all accumulators are Long and covert
258264
// accordingly.

0 commit comments

Comments
 (0)