@@ -28,20 +28,20 @@ import scala.collection.mutable.Queue
28
28
import org .apache .spark .SparkEnv
29
29
import org .apache .spark .internal .{config , Logging }
30
30
31
- private [spark] case class ProcfsBasedSystemsMetrics (jvmVmemTotal : Long ,
32
- jvmRSSTotal : Long ,
33
- pythonVmemTotal : Long ,
34
- pythonRSSTotal : Long ,
35
- otherVmemTotal : Long ,
36
- otherRSSTotal : Long )
31
+ private [spark] case class ProcfsBasedSystemsMetrics (
32
+ jvmVmemTotal : Long ,
33
+ jvmRSSTotal : Long ,
34
+ pythonVmemTotal : Long ,
35
+ pythonRSSTotal : Long ,
36
+ otherVmemTotal : Long ,
37
+ otherRSSTotal : Long )
37
38
38
39
// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop
39
40
// project.
40
- private [spark] class ProcfsBasedSystems extends Logging {
41
- var procfsDir = " /proc/"
41
+ private [spark] class ProcfsBasedSystems (procfsDir : String = " /proc/" ) extends Logging {
42
42
val procfsStatFile = " stat"
43
- var pageSize = 0
44
- var isAvailable : Boolean = isItProcfsBased
43
+ var pageSize : Long = computePageSize()
44
+ var isAvailable : Boolean = isProcfsAvailable
45
45
private val pid : Int = computePid()
46
46
private val ptree : scala.collection.mutable.Map [ Int , Set [Int ]] =
47
47
scala.collection.mutable.Map [ Int , Set [Int ]]()
@@ -56,7 +56,7 @@ private[spark] class ProcfsBasedSystems extends Logging {
56
56
57
57
computeProcessTree()
58
58
59
- private def isItProcfsBased : Boolean = {
59
+ private def isProcfsAvailable : Boolean = {
60
60
val testing = sys.env.contains(" SPARK_TESTING" ) || sys.props.contains(" spark.testing" )
61
61
if (testing) {
62
62
return true
@@ -92,33 +92,36 @@ private[spark] class ProcfsBasedSystems extends Logging {
92
92
}
93
93
catch {
94
94
case e : IOException => logDebug(" IO Exception when trying to compute process tree." +
95
- " As a result reporting of ProcessTree metrics is stopped" )
95
+ " As a result reporting of ProcessTree metrics is stopped" , e )
96
96
isAvailable = false
97
97
return - 1
98
- case _ => logDebug(" Some exception occurred when trying to compute process tree. " +
99
- " As a result reporting of ProcessTree metrics is stopped" )
98
+ case t : Throwable => logDebug(" Some exception occurred when trying to" +
99
+ " compute process tree. As a result reporting of ProcessTree metrics is stopped" , t )
100
100
isAvailable = false
101
101
return - 1
102
102
}
103
103
}
104
104
105
- private def computePageSize (): Unit = {
105
+ private def computePageSize (): Long = {
106
+ val testing = sys.env.contains(" SPARK_TESTING" ) || sys.props.contains(" spark.testing" )
107
+ if (testing) {
108
+ return 0 ;
109
+ }
106
110
val cmd = Array (" getconf" , " PAGESIZE" )
107
111
val out : Array [Byte ] = Array .fill[Byte ](10 )(0 )
108
112
Runtime .getRuntime.exec(cmd).getInputStream.read(out)
109
- pageSize = Integer .parseInt(new String (out, " UTF-8" ).trim)
113
+ return Integer .parseInt(new String (out, " UTF-8" ).trim)
110
114
}
111
115
112
116
private def computeProcessTree (): Unit = {
113
117
if (! isAvailable) {
114
118
return
115
119
}
116
- computePageSize
117
120
val queue : Queue [Int ] = new Queue [Int ]()
118
121
queue += pid
119
122
while ( ! queue.isEmpty ) {
120
123
val p = queue.dequeue()
121
- val c = getChildPIds (p)
124
+ val c = getChildPids (p)
122
125
if (! c.isEmpty) {
123
126
queue ++= c
124
127
ptree += (p -> c.toSet)
@@ -129,7 +132,7 @@ private[spark] class ProcfsBasedSystems extends Logging {
129
132
}
130
133
}
131
134
132
- private def getChildPIds (pid : Int ): ArrayBuffer [Int ] = {
135
+ private def getChildPids (pid : Int ): ArrayBuffer [Int ] = {
133
136
try {
134
137
val cmd = Array (" pgrep" , " -P" , pid.toString)
135
138
val input = Runtime .getRuntime.exec(cmd).getInputStream
@@ -150,23 +153,23 @@ private[spark] class ProcfsBasedSystems extends Logging {
150
153
childPidsInInt
151
154
} catch {
152
155
case e : IOException => logDebug(" IO Exception when trying to compute process tree." +
153
- " As a result reporting of ProcessTree metrics is stopped" )
156
+ " As a result reporting of ProcessTree metrics is stopped" , e )
154
157
isAvailable = false
155
158
return new mutable.ArrayBuffer ()
156
- case _ => logDebug(" Some exception occurred when trying to compute process tree." +
157
- " As a result reporting of ProcessTree metrics is stopped" )
159
+ case t : Throwable => logDebug(" Some exception occurred when trying to compute process tree." +
160
+ " As a result reporting of ProcessTree metrics is stopped" , t )
158
161
isAvailable = false
159
162
return new mutable.ArrayBuffer ()
160
163
}
161
164
}
162
165
163
- /**
166
+ def getProcessInfo (pid : Int ): Unit = {
167
+ /*
164
168
* Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory
165
169
* info. I tried that but found it not correct during tests, so I used normal string analysis
166
170
* instead. The computation of RSS and Vmem are based on proc(5):
167
171
* http://man7.org/linux/man-pages/man5/proc.5.html
168
172
*/
169
- def getProcessInfo (pid : Int ): Unit = {
170
173
try {
171
174
val pidDir : File = new File (procfsDir, pid.toString)
172
175
val fReader = new InputStreamReader (
@@ -178,20 +181,23 @@ private[spark] class ProcfsBasedSystems extends Logging {
178
181
fReader.close
179
182
val procInfoSplit = procInfo.split(" " )
180
183
if ( procInfoSplit != null ) {
184
+ val vmem = procInfoSplit(22 ).toLong
185
+ val rssPages = procInfoSplit(23 ).toLong
181
186
if (procInfoSplit(1 ).toLowerCase.contains(" java" )) {
182
- latestJVMVmemTotal += procInfoSplit( 22 ).toLong
183
- latestJVMRSSTotal += procInfoSplit( 23 ).toLong
187
+ latestJVMVmemTotal += vmem
188
+ latestJVMRSSTotal += rssPages
184
189
}
185
190
else if (procInfoSplit(1 ).toLowerCase.contains(" python" )) {
186
- latestPythonVmemTotal += procInfoSplit( 22 ).toLong
187
- latestPythonRSSTotal += procInfoSplit( 23 ).toLong
191
+ latestPythonVmemTotal += vmem
192
+ latestPythonRSSTotal += rssPages
188
193
}
189
194
else {
190
- latestOtherVmemTotal += procInfoSplit( 22 ).toLong
191
- latestOtherRSSTotal += procInfoSplit( 23 ).toLong }
195
+ latestOtherVmemTotal += vmem
196
+ latestOtherRSSTotal += rssPages }
192
197
}
193
198
} catch {
194
- case f : FileNotFoundException =>
199
+ case f : FileNotFoundException => log.debug(" There was a problem with reading" +
200
+ " the stat file of the process" , f)
195
201
}
196
202
}
197
203
0 commit comments