Skip to content

[SPARK-24958][CORE] Add memory from procfs to executor metrics. #22612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3f8321a
Integration of ProcessTreeMetrics with PR 21221
Jul 26, 2018
cd16a75
Changing the position of ptree and also make the computation configur…
Aug 7, 2018
94c2b04
Seperate metrics for jvm, python and others and update the tests
Aug 8, 2018
062f5d7
Update JsonProtocolSuite
Sep 25, 2018
245221d
[SPARK-24958] Add executors' process tree total memory information to…
Oct 2, 2018
c72be03
Adressing most of Imran's comments
Oct 3, 2018
8f3c938
Fixing the scala style and some minor comments
Oct 3, 2018
f2dca27
Removing types from the definitions where ever possible
Oct 4, 2018
a9f924c
Using Utils methods when possible or use ProcessBuilder
Oct 5, 2018
a11e3a2
make use of Utils.trywithresources
Oct 5, 2018
34ad625
Changing ExecutorMericType and ExecutorMetrics to use a map instead o…
Oct 9, 2018
415f976
Changing ExecutorMetric to use array instead of a map
Oct 10, 2018
067b81d
A small cosmetic change
Oct 10, 2018
18ee4ad
Merge branch 'master' of https://github.com/apache/spark into ptreeme…
Oct 17, 2018
7f7ed2b
Applying latest review commments. Using Arrays instead of Map for ret…
Oct 23, 2018
f3867ff
Merge branch 'master' of https://github.com/apache/spark into ptreeme…
Nov 5, 2018
0f8f3e2
Fix an issue with jsonProtoclSuite
Nov 5, 2018
ea08c61
Fix scalastyle issue
Nov 5, 2018
8f20857
Applying latest review comments
Nov 14, 2018
6e65360
Using the companion object and other stuff
Nov 27, 2018
4659f4a
Update the use of process builder and applying other review comments
Nov 28, 2018
ef4be38
Small style fixes based on reviews
Nov 30, 2018
805741c
Applying review comments, mostly style related
Nov 30, 2018
4c1f073
emove the unnecessary trywithresources
Nov 30, 2018
0a7402e
Applying the comment about error handling and some more style fixes
Dec 4, 2018
3d65b35
Removing a return
Dec 6, 2018
6eab315
Reordering of info in a test resource file to avoid confusion
Dec 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/Heartbeater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,17 @@ private[spark] class Heartbeater(

/**
* Get the current executor level metrics. These are returned as an array, with the index
* determined by ExecutorMetricType.values
* determined by ExecutorMetricType.metricToOffset
*/
def getCurrentMetrics(): ExecutorMetrics = {
val metrics = ExecutorMetricType.values.map(_.getMetricValue(memoryManager)).toArray

val metrics = new Array[Long](ExecutorMetricType.numMetrics)
var offset = 0
ExecutorMetricType.metricGetters.foreach { metric =>
val newMetrics = metric.getMetricValues(memoryManager)
Array.copy(newMetrics, 0, metrics, offset, newMetrics.size)
offset += newMetrics.length
}
new ExecutorMetrics(metrics)
}
}
Expand Down
23 changes: 10 additions & 13 deletions core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@ import org.apache.spark.metrics.ExecutorMetricType
*/
@DeveloperApi
class ExecutorMetrics private[spark] extends Serializable {

// Metrics are indexed by ExecutorMetricType.values
private val metrics = new Array[Long](ExecutorMetricType.values.length)

// Metrics are indexed by ExecutorMetricType.metricToOffset
private val metrics = new Array[Long](ExecutorMetricType.numMetrics)
// the first element is initialized to -1, indicating that the values for the array
// haven't been set yet.
metrics(0) = -1

/** Returns the value for the specified metricType. */
def getMetricValue(metricType: ExecutorMetricType): Long = {
metrics(ExecutorMetricType.metricIdxMap(metricType))
/** Returns the value for the specified metric. */
def getMetricValue(metricName: String): Long = {
metrics(ExecutorMetricType.metricToOffset(metricName))
}

/** Returns true if the values for the metrics have been set, false otherwise. */
Expand All @@ -49,14 +47,14 @@ class ExecutorMetrics private[spark] extends Serializable {
}

/**
* Constructor: create the ExecutorMetrics with the values specified.
* Constructor: create the ExecutorMetrics with using a given map.
*
* @param executorMetrics map of executor metric name to value
*/
private[spark] def this(executorMetrics: Map[String, Long]) {
this()
(0 until ExecutorMetricType.values.length).foreach { idx =>
metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
ExecutorMetricType.metricToOffset.foreach { case(name, idx) =>
metrics(idx) = executorMetrics.getOrElse(name, 0L)
}
}

Expand All @@ -69,9 +67,8 @@ class ExecutorMetrics private[spark] extends Serializable {
*/
private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = {
var updated = false

(0 until ExecutorMetricType.values.length).foreach { idx =>
if (executorMetrics.metrics(idx) > metrics(idx)) {
(0 until ExecutorMetricType.numMetrics).foreach { idx =>
if (executorMetrics.metrics(idx) > metrics(idx)) {
updated = true
metrics(idx) = executorMetrics.metrics(idx)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import java.io._
import java.nio.charset.Charset
import java.nio.file.{Files, Paths}
import java.util.Locale

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Try

import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.Utils


private[spark] case class ProcfsMetrics(
jvmVmemTotal: Long,
jvmRSSTotal: Long,
pythonVmemTotal: Long,
pythonRSSTotal: Long,
otherVmemTotal: Long,
otherRSSTotal: Long)

// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop
// project.
private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends Logging {
private val procfsStatFile = "stat"
private val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
private val pageSize = computePageSize()
private var isAvailable: Boolean = isProcfsAvailable
private val pid = computePid()

private lazy val isProcfsAvailable: Boolean = {
if (testing) {
true
}
else {
val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
case ioe: IOException =>
logWarning("Exception checking for procfs dir", ioe)
false
}
val shouldLogStageExecutorMetrics =
SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
val shouldLogStageExecutorProcessTreeMetrics =
SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics
}
}

private def computePid(): Int = {
if (!isAvailable || testing) {
return -1;
}
try {
// This can be simplified in java9:
// https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
val cmd = Array("bash", "-c", "echo $PPID")
val out = Utils.executeAndGetOutput(cmd)
Integer.parseInt(out.split("\n")(0))
}
catch {
case e: SparkException =>
logWarning("Exception when trying to compute process tree." +
" As a result reporting of ProcessTree metrics is stopped", e)
isAvailable = false
-1
}
}

private def computePageSize(): Long = {
if (testing) {
return 4096;
}
try {
val cmd = Array("getconf", "PAGESIZE")
val out = Utils.executeAndGetOutput(cmd)
Integer.parseInt(out.split("\n")(0))
} catch {
case e: Exception =>
logWarning("Exception when trying to compute pagesize, as a" +
" result reporting of ProcessTree metrics is stopped")
isAvailable = false
0
}
}

private def computeProcessTree(): Set[Int] = {
if (!isAvailable || testing) {
return Set()
}
var ptree: Set[Int] = Set()
ptree += pid
val queue = mutable.Queue.empty[Int]
queue += pid
while ( !queue.isEmpty ) {
val p = queue.dequeue()
val c = getChildPids(p)
if (!c.isEmpty) {
queue ++= c
ptree ++= c.toSet
}
}
ptree
}

private def getChildPids(pid: Int): ArrayBuffer[Int] = {
try {
val builder = new ProcessBuilder("pgrep", "-P", pid.toString)
val process = builder.start()
val childPidsInInt = mutable.ArrayBuffer.empty[Int]
def appendChildPid(s: String): Unit = {
if (s != "") {
logTrace("Found a child pid:" + s)
childPidsInInt += Integer.parseInt(s)
}
}
val stdoutThread = Utils.processStreamByLine("read stdout for pgrep",
process.getInputStream, appendChildPid)
val errorStringBuilder = new StringBuilder()
val stdErrThread = Utils.processStreamByLine(
"stderr for pgrep",
process.getErrorStream,
line => errorStringBuilder.append(line))
val exitCode = process.waitFor()
stdoutThread.join()
stdErrThread.join()
val errorString = errorStringBuilder.toString()
// pgrep will have exit code of 1 if there are more than one child process
// and it will have a exit code of 2 if there is no child process
if (exitCode != 0 && exitCode > 2) {
val cmd = builder.command().toArray.mkString(" ")
logWarning(s"Process $cmd exited with code $exitCode and stderr: $errorString")
throw new SparkException(s"Process $cmd exited with code $exitCode")
}
childPidsInInt
} catch {
case e: Exception =>
logWarning("Exception when trying to compute process tree." +
" As a result reporting of ProcessTree metrics is stopped.", e)
isAvailable = false
mutable.ArrayBuffer.empty[Int]
}
}

def addProcfsMetricsFromOneProcess(
allMetrics: ProcfsMetrics,
pid: Int): ProcfsMetrics = {

// The computation of RSS and Vmem are based on proc(5):
// http://man7.org/linux/man-pages/man5/proc.5.html
try {
val pidDir = new File(procfsDir, pid.toString)
def openReader(): BufferedReader = {
val f = new File(new File(procfsDir, pid.toString), procfsStatFile)
new BufferedReader(new InputStreamReader(new FileInputStream(f), Charset.forName("UTF-8")))
}
Utils.tryWithResource(openReader) { in =>
val procInfo = in.readLine
val procInfoSplit = procInfo.split(" ")
val vmem = procInfoSplit(22).toLong
val rssMem = procInfoSplit(23).toLong * pageSize
if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) {
allMetrics.copy(
jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem)
)
}
else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) {
allMetrics.copy(
pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem)
)
}
else {
allMetrics.copy(
otherVmemTotal = allMetrics.otherVmemTotal + vmem,
otherRSSTotal = allMetrics.otherRSSTotal + (rssMem)
)
}
}
} catch {
case f: IOException =>
logWarning("There was a problem with reading" +
" the stat file of the process. ", f)
ProcfsMetrics(0, 0, 0, 0, 0, 0)
}
}

private[spark] def computeAllMetrics(): ProcfsMetrics = {
if (!isAvailable) {
return ProcfsMetrics(0, 0, 0, 0, 0, 0)
}
val pids = computeProcessTree
var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0)
for (p <- pids) {
allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
// if we had an error getting any of the metrics, we don't want to report partial metrics, as
// that would be misleading.
if (!isAvailable) {
return ProcfsMetrics(0, 0, 0, 0, 0, 0)
}
}
allMetrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one weird thing about the error handling here -- if you get any error from computeProcessTree through all calls to addProcsFsMetricsFromOneProcess, then you'll set isAvailable = false, but you'll still return allMetrics. Depending on when the error occurs, you might have some partially accumulated metrics in there. I think if there is any error, you probably want to return empty metrics.

Also you'll leave isAvailable=false forever after that -- is that OK? I guess I don't really have strong feelings about this one, maybe its OK to give up forever for this executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to be more accurate here when an error happens. Although I think I need to revert to some older version of method signature, since right now there is a cohesion around allMetrics. I will do something tomorrow.
I don't want to introduce overhead of keep checking, but I can think about it and implement some sort of a check after we set isAvailable = false because of an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (p <- pids) {
  allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
  // if we had an error getting any of the metrics, we don't want to report partial metrics, as
  // that would be misleading.
  if (!isAvailable) {
    return ProcfsMetrics(0, 0, 0, 0, 0, 0)
  }   
 }

or just after the loop, if you don't want to check at each iteration (though the overhead is pretty tiny compared to addProcfsMetricsFromOneProcess)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment about overhead was referring to your question about whether is it ok to keep isAvailable=false after error. I guess we can go with the option of not changing isAvailable= false after an error in computing processTree and just set a flag to check afterward and return zero in case of error like the way you suggested in your above comment. That way we will not stop reporting for this executor and we only return zero in case of error. I'm fine with this as well.

}
}

private[spark] object ProcfsMetricsGetter {
final val pTreeInfo = new ProcfsMetricsGetter
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val EVENT_LOG_PROCESS_TREE_METRICS =
ConfigBuilder("spark.eventLog.logStageExecutorProcessTreeMetrics.enabled")
.booleanConf
.createWithDefault(false)

private[spark] val EVENT_LOG_OVERWRITE =
ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)

Expand Down
Loading