Skip to content

Commit 0e014b0

Browse files
committed
[SPARK-17649][CORE] Log how many Spark events got dropped in LiveListenerBus
Log how many Spark events got dropped in LiveListenerBus so that the user can get insights on how to set a correct event queue size. Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #15220 from zsxwing/SPARK-17649.
1 parent 94524ce commit 0e014b0

File tree

1 file changed

+26
-1
lines changed

1 file changed

+26
-1
lines changed

core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
package org.apache.spark.util
1919

2020
import java.util.concurrent._
21-
import java.util.concurrent.atomic.AtomicBoolean
21+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
22+
2223
import scala.util.DynamicVariable
2324

2425
import org.apache.spark.SparkContext
@@ -51,6 +52,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
5152
// Indicate if `stop()` is called
5253
private val stopped = new AtomicBoolean(false)
5354

55+
/** A counter for dropped events. It will be reset every time we log it. */
56+
private val droppedEventsCounter = new AtomicLong(0L)
57+
58+
/** When `droppedEventsCounter` was logged last time in milliseconds. */
59+
@volatile private var lastReportTimestamp = 0L
60+
5461
// Indicate if we are processing some event
5562
// Guarded by `self`
5663
private var processingEvent = false
@@ -117,6 +124,24 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
117124
eventLock.release()
118125
} else {
119126
onDropEvent(event)
127+
droppedEventsCounter.incrementAndGet()
128+
}
129+
130+
val droppedEvents = droppedEventsCounter.get
131+
if (droppedEvents > 0) {
132+
// Don't log too frequently
133+
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
134+
// There may be multiple threads trying to decrease droppedEventsCounter.
135+
// Use "compareAndSet" to make sure only one thread can win.
136+
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
137+
// then that thread will update it.
138+
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
139+
val prevLastReportTimestamp = lastReportTimestamp
140+
lastReportTimestamp = System.currentTimeMillis()
141+
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
142+
new java.util.Date(prevLastReportTimestamp))
143+
}
144+
}
120145
}
121146
}
122147

0 commit comments

Comments
 (0)