-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-20244][Core] Handle incorrect bytesRead metrics when using PySpark #17617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a23633d
8b16017
1e3fb8a
2b08b48
c068f43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ import java.text.DateFormat | |
import java.util.{Arrays, Comparator, Date, Locale} | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable | ||
import scala.util.control.NonFatal | ||
|
||
import com.google.common.primitives.Longs | ||
|
@@ -143,14 +144,29 @@ class SparkHadoopUtil extends Logging { | |
* Returns a function that can be called to find Hadoop FileSystem bytes read. If | ||
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will | ||
* return the bytes read on r since t. | ||
* | ||
* @return None if the required method can't be found. | ||
*/ | ||
private[spark] def getFSBytesReadOnThreadCallback(): () => Long = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's update the document to say that, the returned function may be called in multiple threads. |
||
val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics) | ||
val f = () => threadStats.map(_.getBytesRead).sum | ||
val baselineBytesRead = f() | ||
() => f() - baselineBytesRead | ||
val f = () => FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are you changing this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the previous code, |
||
val baseline = (Thread.currentThread().getId, f()) | ||
|
||
/** | ||
* This function may be called in both spawned child threads and parent task thread (in | ||
* PythonRDD), and Hadoop FileSystem uses thread local variables to track the statistics. | ||
* So we need a map to track the bytes read from the child threads and parent thread, | ||
* summing them together to get the bytes read of this task. | ||
*/ | ||
new Function0[Long] { | ||
private val bytesReadMap = new mutable.HashMap[Long, Long]() | ||
|
||
override def apply(): Long = { | ||
bytesReadMap.synchronized { | ||
bytesReadMap.put(Thread.currentThread().getId, f()) | ||
bytesReadMap.map { case (k, v) => | ||
v - (if (k == baseline._1) baseline._2 else 0) | ||
}.sum | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -251,7 +251,13 @@ class HadoopRDD[K, V]( | |
null | ||
} | ||
// Register an on-task-completion callback to close the input stream. | ||
context.addTaskCompletionListener{ context => closeIfNeeded() } | ||
context.addTaskCompletionListener { context => | ||
// Update the bytes read before closing is to make sure lingering bytesRead statistics in | ||
// this thread get correctly added. | ||
updateBytesRead() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this duplicate with what we do in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
closeIfNeeded() | ||
} | ||
|
||
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey() | ||
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removing this line instead of the doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't return a
None
, but the doc is still corrected about the behavior.