Skip to content

[SPARK-18406][CORE][Backport-2.1] Race between end-of-task and completion iterator read lock release #18099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

jiangxb1987
Copy link
Contributor

This is a backport PR of #18076 to 2.1.

What changes were proposed in this pull request?

When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the unlock method.

How was this patch tested?

Add new failing regression test case in RDDSuite.

…read lock release

When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method.

Add new failing regression test case in `RDDSuite`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes apache#18076 from jiangxb1987/completion-iterator.
@jiangxb1987
Copy link
Contributor Author

cc @gatorsmile

@@ -454,14 +454,20 @@ private[spark] class BlockManager(
case Some(info) =>
val level = info.level
logDebug(s"Level for block $blockId is $level")
val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @jiangxb1987. Looks like the same way to get taskAttemptId as BlockInfoManger. What's the difference to get the taskAttemptId instead of BlockInfoManager.currentTaskAttemptId ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting the taskAttemptId from the main thread, in case a TaskContext is not propagated properly to all child threads for the task, we would fail in getting the taskAttemptId in BlockInfoManager, see the test case added in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for your explanation.

@SparkQA
Copy link

SparkQA commented May 25, 2017

Test build #77311 has finished for PR 18099 at commit aa59b1b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to 2.1!

asfgit pushed a commit that referenced this pull request May 25, 2017
…tion iterator read lock release

This is a backport PR of  #18076 to 2.1.

## What changes were proposed in this pull request?

When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method.

## How was this patch tested?

Add new failing regression test case in `RDDSuite`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #18099 from jiangxb1987/completion-iterator-2.1.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
…tion iterator read lock release

This is a backport PR of  apache#18076 to 2.1.

When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method.

Add new failing regression test case in `RDDSuite`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes apache#18099 from jiangxb1987/completion-iterator-2.1.
@appleyuchi
Copy link

it this fix available to spark2.3.1?
thanks

@appleyuchi
Copy link

the following occur to me when I run lab with ALS in spark

8/08/22 21:24:14 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.AssertionError: assertion failed: Block rdd_7_0 is not locked for reading
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769)
at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
Exception in thread "stdout writer for python" java.lang.AssertionError: assertion failed: Block rdd_7_0 is not locked for reading
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769)
at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)

@uncleGen
Copy link
Contributor

same issue in spark 2.2.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants