File tree Expand file tree Collapse file tree 2 files changed +6
-9
lines changed
main/scala/org/apache/spark/deploy
test/scala/org/apache/spark/metrics Expand file tree Collapse file tree 2 files changed +6
-9
lines changed Original file line number Diff line number Diff line change @@ -149,16 +149,15 @@ class SparkHadoopUtil extends Logging {
149
149
val f = () => FileSystem .getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum
150
150
val baseline = (Thread .currentThread().getId, f())
151
151
152
+ /**
153
+ * This function may be called in both spawned child threads and parent task thread (in
154
+ * PythonRDD), and Hadoop FileSystem uses thread local variables to track the statistics.
155
+ * So we need a map to track the bytes read from the child threads and parent thread,
156
+ * summing them together to get the bytes read of this task.
157
+ */
152
158
new Function0 [Long ] {
153
159
private val bytesReadMap = new mutable.HashMap [Long , Long ]()
154
160
155
- /**
156
- * Returns a function that can be called to calculate Hadoop FileSystem bytes read.
157
- * This function may be called in both spawned child threads and parent task thread (in
158
- * PythonRDD), and Hadoop FileSystem uses thread local variables to track the statistics.
159
- * So we need a map to track the bytes read from the child threads and parent thread,
160
- * summing them together to get the bytes read of this task.
161
- */
162
161
override def apply (): Long = {
163
162
bytesReadMap.synchronized {
164
163
bytesReadMap.put(Thread .currentThread().getId, f())
Original file line number Diff line number Diff line change @@ -331,7 +331,6 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
331
331
buf.iterator
332
332
}.count()
333
333
}
334
- assert(bytesRead != 0 )
335
334
assert(bytesRead >= tmpFile.length())
336
335
}
337
336
@@ -347,7 +346,6 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
347
346
buf.iterator
348
347
}.count()
349
348
}
350
- assert(bytesRead != 0 )
351
349
assert(bytesRead >= tmpFile.length())
352
350
}
353
351
}
You can’t perform that action at this time.
0 commit comments