Skip to content

[SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry. #16690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

jinxing64
Copy link

@jinxing64 jinxing64 commented Jan 24, 2017

What changes were proposed in this pull request?

ReceiverSupervisorImpl on executor side reports block's meta back to ReceiverTracker on driver side. In current code, askWithRetry is used. However, for AddBlock, ReceiverTracker is not idempotent, which may result in messages are processed multiple times.

To reproduce:

  1. Check if it is the first time receiving AddBlock in ReceiverTracker, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in askWithRetry, then AddBlock will be resent.
  2. Rebuild Spark and run following job:
  def streamProcessing(): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingTest")
      .setMaster(masterUrl)
    val ssc = new StreamingContext(conf, Seconds(200))
    val stream = ssc.socketTextStream("localhost", 1234)
    stream.print()
    ssc.start()
    ssc.awaitTermination()
  }

To fix:

It makes sense to provide a blocking version ask in RpcEndpointRef, as mentioned in SPARK-18113 (#16503 (comment)). Because Netty RPC layer will not drop messages. askWithRetry is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

How was this patch tested?

Test manually. The scenario described above doesn't happen with this patch.

@squito
Copy link
Contributor

squito commented Jan 24, 2017

Jenkins, ok to test

@squito
Copy link
Contributor

squito commented Jan 24, 2017

cc @vanzin @zsxwing

(I was going to make a longer comment about removing askWithRetry and whether we need another method, but then saw the comments on the referenced PR -- I'll defer to the experts here)

@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71938 has finished for PR 16690 at commit ce5216e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 26, 2017

Test build #72027 has finished for PR 16690 at commit fe86511.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 27, 2017

Test build #72057 has finished for PR 16690 at commit 3b7e17b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@vanzin @zsxwing
ping for review~

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, since you're adding the blocking API, please add a deprecation annotation to askWithRetry.


import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.RpcUtils


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't add.

* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askWithBlocking[T: ClassTag](message: Any): T = askWithBlocking(message, defaultAskTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

askWithBlocking is a weird name. I'd use blockingAsk, or askSync.

try {
val future = ask[T](message, timeout)
val result = timeout.awaitResult(future)
if (result == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an error. It's perfectly legitimate to return null.

return result
} catch {
case NonFatal(e) =>
throw new SparkException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to just propagate the original exception? You can get the context from the stack trace.

@SparkQA
Copy link

SparkQA commented Jan 28, 2017

Test build #72107 has finished for PR 16690 at commit e7002c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@vanzin
ping for review

@vanzin
Copy link
Contributor

vanzin commented Jan 30, 2017

ping for review

Please be a little more patient, especially during weekends.

@jinxing64
Copy link
Author

I feel very sorry if this is disturbing : )
@vanzin Thanks a lot for continuing reviewing this and I'll be more patient : )
Sorry again~~

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just need to fix the versions.

@@ -91,6 +123,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
@deprecated("use 'askSync' instead.", "2.1.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.2.0

@@ -75,10 +106,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
@deprecated("use 'askSync' instead.", "2.1.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.2.0

@@ -19,6 +19,7 @@ package org.apache.spark.rpc

import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.util.control.NonFatal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used

@SparkQA
Copy link

SparkQA commented Feb 1, 2017

Test build #72228 has finished for PR 16690 at commit 42eb540.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@vanzin
Thanks a lot for helping this PR~ I've already refined~
Please take another look~

@vanzin
Copy link
Contributor

vanzin commented Feb 1, 2017

LGTM, merging to master.

@asfgit asfgit closed this in c5fcb7f Feb 1, 2017
@jinxing64
Copy link
Author

Thanks a lot for reviewing this PR~

asfgit pushed a commit that referenced this pull request Feb 3, 2017
## What changes were proposed in this pull request?

The current code in `HeartbeatReceiverSuite`, executorId is set as below:
```
  private val executorId1 = "executor-1"
  private val executorId2 = "executor-2"
```

The executorId is sent to driver when register as below:

```
test("expire dead hosts should kill executors with replacement (SPARK-8119)")  {
  ...
  fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
  ...
}
```

Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below:
```
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls)  =>
  if (executorDataMap.contains(executorId)) {
    executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
    context.reply(true)
  } else {
  ...
  executorDataMap.put(executorId, data)
  if (currentExecutorIdCounter < executorId.toInt) {
    currentExecutorIdCounter = executorId.toInt
  }
  ...
```

`executorId.toInt` will cause NumberformatException.

This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true.

**To fix**
Rectify executorId and replace `askWithRetry` with `askSync`, refer to #16690
## How was this patch tested?
This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: jinxing <jinxing@meituan.com>

Closes #16779 from jinxing64/SPARK-19437.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
… multiple times because of askWithRetry.

## What changes were proposed in this pull request?

`ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times.

**To reproduce**:

1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent.
2. Rebuild Spark and run following job:
```
  def streamProcessing(): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingTest")
      .setMaster(masterUrl)
    val ssc = new StreamingContext(conf, Seconds(200))
    val stream = ssc.socketTextStream("localhost", 1234)
    stream.print()
    ssc.start()
    ssc.awaitTermination()
  }
```
**To fix**:

It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (apache#16503 (comment)). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

## How was this patch tested?
Test manually. The scenario described above doesn't happen with this patch.

Author: jinxing <jinxing@meituan.com>

Closes apache#16690 from jinxing64/SPARK-19347.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
## What changes were proposed in this pull request?

The current code in `HeartbeatReceiverSuite`, executorId is set as below:
```
  private val executorId1 = "executor-1"
  private val executorId2 = "executor-2"
```

The executorId is sent to driver when register as below:

```
test("expire dead hosts should kill executors with replacement (SPARK-8119)")  {
  ...
  fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
  ...
}
```

Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below:
```
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls)  =>
  if (executorDataMap.contains(executorId)) {
    executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
    context.reply(true)
  } else {
  ...
  executorDataMap.put(executorId, data)
  if (currentExecutorIdCounter < executorId.toInt) {
    currentExecutorIdCounter = executorId.toInt
  }
  ...
```

`executorId.toInt` will cause NumberformatException.

This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true.

**To fix**
Rectify executorId and replace `askWithRetry` with `askSync`, refer to apache#16690
## How was this patch tested?
This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: jinxing <jinxing@meituan.com>

Closes apache#16779 from jinxing64/SPARK-19437.
@@ -75,10 +105,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
@deprecated("use 'askSync' instead.", "2.2.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this has caused the build to produce a lot of deprecation warnings. @jinxing64 could the callers of this method be changed, in Spark, to use the new alternative?

@jinxing64
Copy link
Author

@srowen
How do you think about #16790?

ghost pushed a commit to dbtsai/spark that referenced this pull request Feb 19, 2017
## What changes were proposed in this pull request?

`askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and apache#16690 (comment)) and `askWithRetry` is marked as deprecated.
As mentioned SPARK-18113(apache#16503 (comment)):

>askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`.

## How was this patch tested?
This PR doesn't change code logic, existing unit test can cover.

Author: jinxing <jinxing@meituan.com>

Closes apache#16790 from jinxing64/SPARK-19450.
Yunni pushed a commit to Yunni/spark that referenced this pull request Feb 27, 2017
## What changes were proposed in this pull request?

`askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and apache#16690 (comment)) and `askWithRetry` is marked as deprecated.
As mentioned SPARK-18113(apache#16503 (comment)):

>askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`.

## How was this patch tested?
This PR doesn't change code logic, existing unit test can cover.

Author: jinxing <jinxing@meituan.com>

Closes apache#16790 from jinxing64/SPARK-19450.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants