Skip to content

[SPARK-20244][Core] Handle incorrect bytesRead metrics when using PySpark #17617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal

import com.google.common.primitives.Longs
Expand Down Expand Up @@ -143,14 +144,29 @@ class SparkHadoopUtil extends Logging {
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes read on r since t.
*
* @return None if the required method can't be found.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing this line instead of the doc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't return a None, but the doc is still corrected about the behavior.

*/
private[spark] def getFSBytesReadOnThreadCallback(): () => Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update the document to say that, the returned function may be called in multiple threads.

val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
val f = () => threadStats.map(_.getBytesRead).sum
val baselineBytesRead = f()
() => f() - baselineBytesRead
val f = () => FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you changing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the previous code, threadStats and f function can be executed in two threads, so the metrics we got can be wrong.

val baseline = (Thread.currentThread().getId, f())

/**
* This function may be called in both spawned child threads and parent task thread (in
* PythonRDD), and Hadoop FileSystem uses thread local variables to track the statistics.
* So we need a map to track the bytes read from the child threads and parent thread,
* summing them together to get the bytes read of this task.
*/
new Function0[Long] {
private val bytesReadMap = new mutable.HashMap[Long, Long]()

override def apply(): Long = {
bytesReadMap.synchronized {
bytesReadMap.put(Thread.currentThread().getId, f())
bytesReadMap.map { case (k, v) =>
v - (if (k == baseline._1) baseline._2 else 0)
}.sum
}
}
}
}

/**
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,13 @@ class HadoopRDD[K, V](
null
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
context.addTaskCompletionListener { context =>
// Update the bytes read before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this duplicate with what we do in close()?

Copy link
Contributor Author

@jerryshao jerryshao May 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close can be called in another thread as I remembered, so I added here to avoid lingering bytesRead in task running thread (Some bytes can be read when creating InputFormat), also it is no harm to call this updateBytesRead again.

closeIfNeeded()
}

private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()

Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,13 @@ class NewHadoopRDD[K, V](
}

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
context.addTaskCompletionListener { context =>
// Update the bytesRead before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
close()
}

private var havePair = false
private var recordsSinceMetricsUpdate = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.{SharedSparkContext, SparkFunSuite}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
with BeforeAndAfter {
Expand Down Expand Up @@ -319,6 +319,35 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
}
assert(bytesRead >= tmpFile.length())
}

test("input metrics with old Hadoop API in different thread") {
val bytesRead = runAndReturnBytesRead {
sc.textFile(tmpFilePath, 4).mapPartitions { iter =>
val buf = new ArrayBuffer[String]()
ThreadUtils.runInNewThread("testThread", false) {
iter.flatMap(_.split(" ")).foreach(buf.append(_))
}

buf.iterator
}.count()
}
assert(bytesRead >= tmpFile.length())
}

test("input metrics with new Hadoop API in different thread") {
val bytesRead = runAndReturnBytesRead {
sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
classOf[Text]).mapPartitions { iter =>
val buf = new ArrayBuffer[String]()
ThreadUtils.runInNewThread("testThread", false) {
iter.map(_._2.toString).flatMap(_.split(" ")).foreach(buf.append(_))
}

buf.iterator
}.count()
}
assert(bytesRead >= tmpFile.length())
}
}

/**
Expand Down