Skip to content

Commit 2682f74

Browse files
authored
SPARK-723 Add many more metrics to dispatcher (apache#24)
* Spark-723 Add many more metrics to dispatcher - Counters: The total number of times that submissions have entered states - Timers: The duration from submit or launch until a submission entered a given state - Histogram: The retry counts at time of retry * Fixes to handling finished drivers - Rename 'failed' case to 'exception' - When a driver is 'finished', record its final MesosTaskState - Fix naming consistency after seeing how they look in practice * Register "finished" counters up-front Otherwise their values are never published.
1 parent d79b7f8 commit 2682f74

File tree

2 files changed

+163
-8
lines changed

2 files changed

+163
-8
lines changed

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,9 @@ private[spark] class MesosClusterScheduler(
125125
conf: SparkConf)
126126
extends Scheduler with MesosSchedulerUtils {
127127
var frameworkUrl: String = _
128+
private val metricsSource = new MesosClusterSchedulerSource(this)
128129
private val metricsSystem =
129-
MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
130+
MetricsSystem.createMetricsSystem(metricsSource.sourceName, conf, new SecurityManager(conf))
130131
private val master = conf.get("spark.master")
131132
private val appName = conf.get("spark.app.name")
132133
private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
@@ -308,7 +309,7 @@ private[spark] class MesosClusterScheduler(
308309
frameworkId = id
309310
}
310311
recoverState()
311-
metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
312+
metricsSystem.registerSource(metricsSource)
312313
metricsSystem.start()
313314
val driver = createSchedulerDriver(
314315
master,
@@ -640,12 +641,14 @@ private[spark] class MesosClusterScheduler(
640641
new Date(),
641642
None,
642643
getDriverFrameworkID(submission))
644+
metricsSource.recordLaunchedDriver(submission)
643645
launchedDrivers(submission.submissionId) = newState
644646
launchedDriversState.persist(submission.submissionId, newState)
645647
afterLaunchCallback(submission.submissionId)
646648
} catch {
647649
case e: SparkException =>
648650
afterLaunchCallback(submission.submissionId)
651+
metricsSource.recordExceptionDriver(submission)
649652
finishedDrivers += new MesosClusterSubmissionState(
650653
submission,
651654
TaskID.newBuilder().setValue(submission.submissionId).build(),
@@ -767,8 +770,10 @@ private[spark] class MesosClusterScheduler(
767770
val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
768771
val newDriverDescription = state.driverDescription.copy(
769772
retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
773+
metricsSource.recordRetryingDriver(state)
770774
addDriverToPending(newDriverDescription, newDriverDescription.submissionId)
771775
} else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
776+
metricsSource.recordFinishedDriver(state, status.getState)
772777
retireDriver(subId, state)
773778
}
774779
state.mesosTaskStatus = Option(status)
@@ -836,9 +841,11 @@ private[spark] class MesosClusterScheduler(
836841
def getQueuedDriversSize: Int = queuedDrivers.size
837842
def getLaunchedDriversSize: Int = launchedDrivers.size
838843
def getPendingRetryDriversSize: Int = pendingRetryDrivers.size
844+
def getFinishedDriversSize: Int = finishedDrivers.size
839845

840846
private def addDriverToQueue(desc: MesosDriverDescription): Unit = {
841847
queuedDriversState.persist(desc.submissionId, desc)
848+
metricsSource.recordQueuedDriver()
842849
queuedDrivers += desc
843850
revive()
844851
}

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala

Lines changed: 154 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,173 @@
1717

1818
package org.apache.spark.scheduler.cluster.mesos
1919

20-
import com.codahale.metrics.{Gauge, MetricRegistry}
20+
import java.util.concurrent.TimeUnit
21+
import java.util.Date
2122

23+
import scala.collection.mutable.HashMap
24+
25+
import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
26+
27+
import org.apache.spark.TaskState
28+
import org.apache.spark.deploy.mesos.MesosDriverDescription
2229
import org.apache.spark.metrics.source.Source
2330

31+
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
32+
2433
private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterScheduler)
25-
extends Source {
34+
extends Source with MesosSchedulerUtils {
35+
36+
// Submission state transitions, to derive metrics from:
37+
// - submit():
38+
// From: NULL
39+
// To: queuedDrivers
40+
// - offers/scheduleTasks():
41+
// From: queuedDrivers and any pendingRetryDrivers scheduled for retry
42+
// To: launchedDrivers if success, or
43+
// finishedDrivers(fail) if exception
44+
// - taskStatus/statusUpdate():
45+
// From: launchedDrivers
46+
// To: finishedDrivers(success) if success (or fail and not eligible to retry), or
47+
// pendingRetryDrivers if failed (and eligible to retry)
48+
// - pruning/retireDriver():
49+
// From: finishedDrivers:
50+
// To: NULL
2651

2752
override val sourceName: String = "mesos_cluster"
28-
override val metricRegistry: MetricRegistry = new MetricRegistry()
53+
override val metricRegistry: MetricRegistry = new MetricRegistry
54+
55+
// PULL METRICS:
56+
// These gauge metrics are periodically polled/pulled by the metrics system
2957

30-
metricRegistry.register(MetricRegistry.name("waitingDrivers"), new Gauge[Int] {
58+
metricRegistry.register(MetricRegistry.name("driver", "waiting"), new Gauge[Int] {
3159
override def getValue: Int = scheduler.getQueuedDriversSize
3260
})
3361

34-
metricRegistry.register(MetricRegistry.name("launchedDrivers"), new Gauge[Int] {
62+
metricRegistry.register(MetricRegistry.name("driver", "launched"), new Gauge[Int] {
3563
override def getValue: Int = scheduler.getLaunchedDriversSize
3664
})
3765

38-
metricRegistry.register(MetricRegistry.name("retryDrivers"), new Gauge[Int] {
66+
metricRegistry.register(MetricRegistry.name("driver", "retry"), new Gauge[Int] {
3967
override def getValue: Int = scheduler.getPendingRetryDriversSize
4068
})
69+
70+
metricRegistry.register(MetricRegistry.name("driver", "finished"), new Gauge[Int] {
71+
override def getValue: Int = scheduler.getFinishedDriversSize
72+
})
73+
74+
// PUSH METRICS:
75+
// These metrics are updated directly as events occur
76+
77+
private val queuedCounter = metricRegistry.counter(MetricRegistry.name("driver", "waiting_count"))
78+
private val launchedCounter =
79+
metricRegistry.counter(MetricRegistry.name("driver", "launched_count"))
80+
private val retryCounter = metricRegistry.counter(MetricRegistry.name("driver", "retry_count"))
81+
private val exceptionCounter =
82+
metricRegistry.counter(MetricRegistry.name("driver", "exception_count"))
83+
private val finishedCounter =
84+
metricRegistry.counter(MetricRegistry.name("driver", "finished_count"))
85+
86+
// Same as finishedCounter above, except grouped by MesosTaskState.
87+
private val finishedMesosStateCounters = MesosTaskState.values
88+
// Avoid registering 'finished' metrics for states that aren't considered finished:
89+
.filter(state => TaskState.isFinished(mesosToTaskState(state)))
90+
.map(state => (state, metricRegistry.counter(
91+
MetricRegistry.name("driver", "finished_count_mesos_state", state.name.toLowerCase))))
92+
.toMap
93+
private val finishedMesosUnknownStateCounter =
94+
metricRegistry.counter(MetricRegistry.name("driver", "finished_count_mesos_state", "UNKNOWN"))
95+
96+
// Duration from submission to FIRST launch.
97+
// This omits retries since those would exaggerate the time since original submission.
98+
private val submitToFirstLaunch =
99+
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_first_launch"))
100+
// Duration from initial submission to an exception.
101+
private val submitToException =
102+
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_exception"))
103+
104+
// Duration from (most recent) launch to a retry.
105+
private val launchToRetry = metricRegistry.timer(MetricRegistry.name("driver", "launch_to_retry"))
106+
107+
// Duration from initial submission to finished.
108+
private val submitToFinish =
109+
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_finish"))
110+
// Duration from (most recent) launch to finished.
111+
private val launchToFinish =
112+
metricRegistry.timer(MetricRegistry.name("driver", "launch_to_finish"))
113+
114+
// Same as submitToFinish and launchToFinish above, except grouped by Spark TaskState.
115+
class FinishStateTimers(state: String) {
116+
val submitToFinish =
117+
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_finish_state", state))
118+
val launchToFinish =
119+
metricRegistry.timer(MetricRegistry.name("driver", "launch_to_finish_state", state))
120+
}
121+
private val finishSparkStateTimers = HashMap.empty[TaskState.TaskState, FinishStateTimers]
122+
for (state <- TaskState.values) {
123+
// Avoid registering 'finished' metrics for states that aren't considered finished:
124+
if (TaskState.isFinished(state)) {
125+
finishSparkStateTimers += (state -> new FinishStateTimers(state.toString.toLowerCase))
126+
}
127+
}
128+
private val submitToFinishUnknownState = metricRegistry.timer(
129+
MetricRegistry.name("driver", "submit_to_finish_state", "UNKNOWN"))
130+
private val launchToFinishUnknownState = metricRegistry.timer(
131+
MetricRegistry.name("driver", "launch_to_finish_state", "UNKNOWN"))
132+
133+
// Histogram of retry counts at retry scheduling
134+
private val retryCount = metricRegistry.histogram(MetricRegistry.name("driver", "retry_counts"))
135+
136+
// Records when a submission initially enters the launch queue.
137+
def recordQueuedDriver(): Unit = queuedCounter.inc
138+
139+
// Records when a submission has failed an attempt and is eligible to be retried
140+
def recordRetryingDriver(state: MesosClusterSubmissionState): Unit = {
141+
state.driverDescription.retryState.foreach(retryState => retryCount.update(retryState.retries))
142+
recordTimeSince(state.startDate, launchToRetry)
143+
retryCounter.inc
144+
}
145+
146+
// Records when a submission is launched.
147+
def recordLaunchedDriver(desc: MesosDriverDescription): Unit = {
148+
if (!desc.retryState.isDefined) {
149+
recordTimeSince(desc.submissionDate, submitToFirstLaunch)
150+
}
151+
launchedCounter.inc
152+
}
153+
154+
// Records when a submission has successfully finished, or failed and was not eligible for retry.
155+
def recordFinishedDriver(state: MesosClusterSubmissionState, mesosState: MesosTaskState): Unit = {
156+
finishedCounter.inc
157+
158+
recordTimeSince(state.driverDescription.submissionDate, submitToFinish)
159+
recordTimeSince(state.startDate, launchToFinish)
160+
161+
// Timers grouped by Spark TaskState:
162+
val sparkState = mesosToTaskState(mesosState)
163+
finishSparkStateTimers.get(sparkState) match {
164+
case Some(timers) => {
165+
recordTimeSince(state.driverDescription.submissionDate, timers.submitToFinish)
166+
recordTimeSince(state.startDate, timers.launchToFinish)
167+
}
168+
case None => {
169+
recordTimeSince(state.driverDescription.submissionDate, submitToFinishUnknownState)
170+
recordTimeSince(state.startDate, launchToFinishUnknownState)
171+
}
172+
}
173+
174+
// Counter grouped by MesosTaskState:
175+
finishedMesosStateCounters.get(mesosState) match {
176+
case Some(counter) => counter.inc
177+
case None => finishedMesosUnknownStateCounter.inc
178+
}
179+
}
180+
181+
// Records when a submission has terminally failed due to an exception at construction.
182+
def recordExceptionDriver(desc: MesosDriverDescription): Unit = {
183+
recordTimeSince(desc.submissionDate, submitToException)
184+
exceptionCounter.inc
185+
}
186+
187+
private def recordTimeSince(date: Date, timer: Timer): Unit =
188+
timer.update(System.currentTimeMillis - date.getTime, TimeUnit.MILLISECONDS)
41189
}

0 commit comments

Comments
 (0)