Skip to content

Commit 7879e66

Browse files
authored
Merge pull request #1 from squito/metric_enums
Metric enums
2 parents 10ed328 + c502ec4 commit 7879e66

File tree

10 files changed

+143
-210
lines changed

10 files changed

+143
-210
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 10 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@ package org.apache.spark.executor
1919

2020
import java.io.{File, NotSerializableException}
2121
import java.lang.Thread.UncaughtExceptionHandler
22-
import java.lang.management.{BufferPoolMXBean, ManagementFactory}
22+
import java.lang.management.ManagementFactory
2323
import java.net.{URI, URL}
2424
import java.nio.ByteBuffer
2525
import java.util.Properties
2626
import java.util.concurrent._
2727
import javax.annotation.concurrent.GuardedBy
28-
import javax.management.ObjectName
2928

3029
import scala.collection.JavaConverters._
3130
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
@@ -38,8 +37,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
3837
import org.apache.spark.internal.Logging
3938
import org.apache.spark.internal.config._
4039
import org.apache.spark.memory.{MemoryManager, SparkOutOfMemoryError, TaskMemoryManager}
40+
import org.apache.spark.metrics.MetricGetter
4141
import org.apache.spark.rpc.RpcTimeout
42-
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
42+
import org.apache.spark.scheduler._
4343
import org.apache.spark.shuffle.FetchFailedException
4444
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
4545
import org.apache.spark.util._
@@ -72,12 +72,6 @@ private[spark] class Executor(
7272

7373
private val conf = env.conf
7474

75-
// BufferPoolMXBean for direct memory
76-
private val directBufferPool = Executor.getBufferPool(Executor.DIRECT_BUFFER_POOL_NAME)
77-
78-
// BufferPoolMXBean for mapped memory
79-
private val mappedBufferPool = Executor.getBufferPool(Executor.MAPPED_BUFFER_POOL_NAME)
80-
8175
// No ip or host:port - just hostname
8276
Utils.checkHost(executorHostname)
8377
// must not have port specified.
@@ -780,8 +774,7 @@ private[spark] class Executor(
780774
val curGCTime = computeTotalGcTime()
781775

782776
// get executor level memory metrics
783-
val executorUpdates = Executor.getCurrentExecutorMetrics(env.memoryManager,
784-
directBufferPool, mappedBufferPool)
777+
val executorUpdates = Executor.getCurrentExecutorMetrics(env.memoryManager)
785778

786779
for (taskRunner <- runningTasks.values().asScala) {
787780
if (taskRunner.task != null) {
@@ -820,42 +813,14 @@ private[spark] object Executor {
820813
// used instead.
821814
val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties]
822815

823-
val DIRECT_BUFFER_POOL_NAME = "direct"
824-
val MAPPED_BUFFER_POOL_NAME = "mapped"
825-
826-
/** Get the BufferPoolMXBean for the specified buffer pool. */
827-
def getBufferPool(pool: String): BufferPoolMXBean = {
828-
val name = new ObjectName("java.nio:type=BufferPool,name=" + pool)
829-
ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer,
830-
name.toString, classOf[BufferPoolMXBean])
831-
}
832-
833816
/**
834817
* Get the current executor level memory metrics.
835-
*
836-
* @param memoryManager the memory manager
837-
* @param direct the direct memory buffer pool
838-
* @param mapped the mapped memory buffer pool
839-
* @return the executor memory metrics
840818
*/
841-
def getCurrentExecutorMetrics(
842-
memoryManager: MemoryManager,
843-
direct: BufferPoolMXBean,
844-
mapped: BufferPoolMXBean) : ExecutorMetrics = {
845-
val onHeapExecutionMemoryUsed = memoryManager.onHeapExecutionMemoryUsed
846-
val offHeapExecutionMemoryUsed = memoryManager.offHeapExecutionMemoryUsed
847-
val onHeapStorageMemoryUsed = memoryManager.onHeapStorageMemoryUsed
848-
val offHeapStorageMemoryUsed = memoryManager.offHeapStorageMemoryUsed
849-
new ExecutorMetrics(System.currentTimeMillis(),
850-
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
851-
ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed(),
852-
onHeapExecutionMemoryUsed,
853-
offHeapExecutionMemoryUsed,
854-
onHeapStorageMemoryUsed,
855-
offHeapStorageMemoryUsed,
856-
onHeapExecutionMemoryUsed + onHeapStorageMemoryUsed, // on heap unified memory
857-
offHeapExecutionMemoryUsed + offHeapStorageMemoryUsed, // off heap unified memory
858-
direct.getMemoryUsed,
859-
mapped.getMemoryUsed)
819+
def getCurrentExecutorMetrics(memoryManager: MemoryManager): ExecutorMetrics = {
820+
val metrics = new ExecutorMetrics(System.currentTimeMillis())
821+
MetricGetter.idxAndValues.foreach { case (idx, metric) =>
822+
metrics.metrics(idx) = metric.getMetricValue(memoryManager)
823+
}
824+
metrics
860825
}
861826
}

core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.executor
1919

2020
import org.apache.spark.annotation.DeveloperApi
21+
import org.apache.spark.metrics.MetricGetter
2122

2223
/**
2324
* :: DeveloperApi ::
@@ -28,27 +29,8 @@ import org.apache.spark.annotation.DeveloperApi
2829
*
2930
* @param timestamp the time the metrics were collected, or -1 for Spark history
3031
* log events which are logged when a stage has completed
31-
* @param jvmUsedHeapMemory the amount of JVM used heap memory for the executor
32-
* @param jvmUsedNonHeapMemory the amount of JVM used non-heap memory for the executor
33-
* @param onHeapExecutionMemory the amount of on heap execution memory used
34-
* @param offHeapExecutionMemory the amount of off heap execution memory used
35-
* @param onHeapStorageMemory the amount of on heap storage memory used
36-
* @param offHeapStorageMemory the amount of off heap storage memory used
37-
* @param onHeapUnifiedMemory the amount of on heap unified region memory used
38-
* @param offHeapUnifiedMemory the amount of off heap unified region memory used
39-
* @param directMemory the amount of direct memory used
40-
* @param mappedMemory the amount of mapped memory used
4132
*/
4233
@DeveloperApi
43-
class ExecutorMetrics private[spark] (
44-
val timestamp: Long,
45-
val jvmUsedHeapMemory: Long,
46-
val jvmUsedNonHeapMemory: Long,
47-
val onHeapExecutionMemory: Long,
48-
val offHeapExecutionMemory: Long,
49-
val onHeapStorageMemory: Long,
50-
val offHeapStorageMemory: Long,
51-
val onHeapUnifiedMemory: Long,
52-
val offHeapUnifiedMemory: Long,
53-
val directMemory: Long,
54-
val mappedMemory: Long) extends Serializable
34+
class ExecutorMetrics private[spark] (val timestamp: Long) extends Serializable {
35+
val metrics = new Array[Long](MetricGetter.values.length)
36+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.metrics
18+
19+
import java.lang.management.{BufferPoolMXBean, ManagementFactory}
20+
import javax.management.ObjectName
21+
22+
import org.apache.spark.memory.MemoryManager
23+
24+
sealed trait MetricGetter {
25+
def getMetricValue(memoryManager: MemoryManager): Long
26+
val name = getClass().getName().stripSuffix("$")
27+
}
28+
29+
abstract class MemoryManagerMetricGetter(f: MemoryManager => Long) extends MetricGetter {
30+
override def getMetricValue(memoryManager: MemoryManager): Long = {
31+
f(memoryManager)
32+
}
33+
}
34+
35+
abstract class MBeanMetricGetter(mBeanName: String) extends MetricGetter {
36+
val bean = ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer,
37+
new ObjectName(mBeanName).toString, classOf[BufferPoolMXBean])
38+
39+
override def getMetricValue(memoryManager: MemoryManager): Long = {
40+
bean.getMemoryUsed
41+
}
42+
}
43+
44+
case object JVMHeapMemory extends MetricGetter {
45+
override def getMetricValue(memoryManager: MemoryManager): Long = {
46+
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed()
47+
}
48+
}
49+
50+
case object JVMOffHeapMemory extends MetricGetter {
51+
override def getMetricValue(memoryManager: MemoryManager): Long = {
52+
ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed()
53+
}
54+
}
55+
56+
case object OnHeapExecution extends MemoryManagerMetricGetter(_.onHeapExecutionMemoryUsed)
57+
58+
case object OffHeapExecution extends MemoryManagerMetricGetter(_.offHeapExecutionMemoryUsed)
59+
60+
case object OnHeapStorage extends MemoryManagerMetricGetter(_.onHeapStorageMemoryUsed)
61+
62+
case object OffHeapStorage extends MemoryManagerMetricGetter(_.offHeapStorageMemoryUsed)
63+
64+
case object DirectPoolMemory extends MBeanMetricGetter("java.nio:type=BufferPool,name=direct")
65+
case object MappedPoolMemory extends MBeanMetricGetter("java.nio:type=BufferPool,name=mapped")
66+
67+
object MetricGetter {
68+
val values = IndexedSeq(
69+
JVMHeapMemory,
70+
JVMOffHeapMemory,
71+
OnHeapExecution,
72+
OffHeapExecution,
73+
OnHeapStorage,
74+
OffHeapStorage,
75+
DirectPoolMemory,
76+
MappedPoolMemory
77+
)
78+
79+
val idxAndValues = values.zipWithIndex.map(_.swap)
80+
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,6 @@ class DAGScheduler(
214214
private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, "driver-heartbeater",
215215
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
216216

217-
/** BufferPoolMXBean for direct memory */
218-
private val directBufferPool = Executor.getBufferPool(Executor.DIRECT_BUFFER_POOL_NAME)
219-
220-
/** BufferPoolMXBean for mapped memory */
221-
private val mappedBufferPool = Executor.getBufferPool(Executor.MAPPED_BUFFER_POOL_NAME)
222-
223217
/**
224218
* Called by the TaskSetManager to report task's starting.
225219
*/
@@ -1772,8 +1766,7 @@ class DAGScheduler(
17721766
/** Reports heartbeat metrics for the driver. */
17731767
private def reportHeartBeat(): Unit = {
17741768
// get driver memory metrics
1775-
val driverUpdates = Executor.getCurrentExecutorMetrics(
1776-
sc.env.memoryManager, directBufferPool, mappedBufferPool)
1769+
val driverUpdates = Executor.getCurrentExecutorMetrics(sc.env.memoryManager)
17771770
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
17781771
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
17791772
Some(driverUpdates)))

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,9 @@ logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecut
197197
executorMap.foreach {
198198
executorEntry => {
199199
for ((executorId, peakExecutorMetrics) <- executorEntry) {
200-
val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.jvmUsedHeapMemory,
201-
peakExecutorMetrics.jvmUsedNonHeapMemory, peakExecutorMetrics.onHeapExecutionMemory,
202-
peakExecutorMetrics.offHeapExecutionMemory, peakExecutorMetrics.onHeapStorageMemory,
203-
peakExecutorMetrics.offHeapStorageMemory, peakExecutorMetrics.onHeapUnifiedMemory,
204-
peakExecutorMetrics.offHeapUnifiedMemory, peakExecutorMetrics.directMemory,
205-
peakExecutorMetrics.mappedMemory)
200+
val executorMetrics = new ExecutorMetrics(-1)
201+
System.arraycopy(peakExecutorMetrics.metrics, 0, executorMetrics.metrics, 0,
202+
peakExecutorMetrics.metrics.size)
206203
val executorUpdate = new SparkListenerExecutorMetricsUpdate(
207204
executorId, accumUpdates, Some(executorMetrics))
208205
logEvent(executorUpdate)

core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala

Lines changed: 19 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,16 @@
1818
package org.apache.spark.scheduler
1919

2020
import org.apache.spark.executor.ExecutorMetrics
21+
import org.apache.spark.metrics.MetricGetter
2122
import org.apache.spark.status.api.v1.PeakMemoryMetrics
2223

2324
/**
2425
* Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no
2526
* values have been recorded yet.
2627
*/
2728
private[spark] class PeakExecutorMetrics {
28-
private var _jvmUsedHeapMemory = -1L;
29-
private var _jvmUsedNonHeapMemory = 0L;
30-
private var _onHeapExecutionMemory = 0L
31-
private var _offHeapExecutionMemory = 0L
32-
private var _onHeapStorageMemory = 0L
33-
private var _offHeapStorageMemory = 0L
34-
private var _onHeapUnifiedMemory = 0L
35-
private var _offHeapUnifiedMemory = 0L
36-
private var _directMemory = 0L
37-
private var _mappedMemory = 0L
38-
39-
def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory
40-
41-
def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory
42-
43-
def onHeapExecutionMemory: Long = _onHeapExecutionMemory
44-
45-
def offHeapExecutionMemory: Long = _offHeapExecutionMemory
46-
47-
def onHeapStorageMemory: Long = _onHeapStorageMemory
48-
49-
def offHeapStorageMemory: Long = _offHeapStorageMemory
50-
51-
def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory
52-
53-
def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory
54-
55-
def directMemory: Long = _directMemory
56-
57-
def mappedMemory: Long = _mappedMemory
29+
val metrics = new Array[Long](MetricGetter.values.length)
30+
metrics(0) = -1
5831

5932
/**
6033
* Compare the specified memory values with the saved peak executor memory
@@ -66,47 +39,13 @@ private[spark] class PeakExecutorMetrics {
6639
def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = {
6740
var updated: Boolean = false
6841

69-
if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) {
70-
_jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory
71-
updated = true
72-
}
73-
if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) {
74-
_jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory
75-
updated = true
42+
(0 until MetricGetter.values.length).foreach { metricIdx =>
43+
val newVal = executorMetrics.metrics(metricIdx)
44+
if ( newVal > metrics(metricIdx)) {
45+
updated = true
46+
metrics(metricIdx) = newVal
47+
}
7648
}
77-
if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) {
78-
_onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory
79-
updated = true
80-
}
81-
if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) {
82-
_offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory
83-
updated = true
84-
}
85-
if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) {
86-
_onHeapStorageMemory = executorMetrics.onHeapStorageMemory
87-
updated = true
88-
}
89-
if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) {
90-
_offHeapStorageMemory = executorMetrics.offHeapStorageMemory
91-
updated = true
92-
}
93-
if (executorMetrics.onHeapUnifiedMemory > _onHeapUnifiedMemory) {
94-
_onHeapUnifiedMemory = executorMetrics.onHeapUnifiedMemory
95-
updated = true
96-
}
97-
if (executorMetrics.offHeapUnifiedMemory > _offHeapUnifiedMemory) {
98-
_offHeapUnifiedMemory = executorMetrics.offHeapUnifiedMemory
99-
updated = true
100-
}
101-
if (executorMetrics.directMemory > _directMemory) {
102-
_directMemory = executorMetrics.directMemory
103-
updated = true
104-
}
105-
if (executorMetrics.mappedMemory > _mappedMemory) {
106-
_mappedMemory = executorMetrics.mappedMemory
107-
updated = true
108-
}
109-
11049
updated
11150
}
11251

@@ -115,13 +54,18 @@ private[spark] class PeakExecutorMetrics {
11554
* values set.
11655
*/
11756
def getPeakMemoryMetrics: Option[PeakMemoryMetrics] = {
118-
if (_jvmUsedHeapMemory < 0) {
57+
if (metrics(0) < 0) {
11958
None
12059
} else {
121-
Some(new PeakMemoryMetrics(_jvmUsedHeapMemory, _jvmUsedNonHeapMemory,
122-
_onHeapExecutionMemory, _offHeapExecutionMemory, _onHeapStorageMemory,
123-
_offHeapStorageMemory, _onHeapUnifiedMemory, _offHeapUnifiedMemory,
124-
_directMemory, _mappedMemory))
60+
val copy = new PeakMemoryMetrics
61+
System.arraycopy(this.metrics, 0, copy.metrics, 0, this.metrics.length)
62+
Some(copy)
12563
}
12664
}
65+
66+
/** Clears/resets the saved peak values. */
67+
def reset(): Unit = {
68+
(0 until metrics.length).foreach { idx => metrics(idx) = 0}
69+
metrics(0) = -1
70+
}
12771
}

0 commit comments

Comments
 (0)