-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry. #16690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c5bcccf
to
ce5216e
Compare
Jenkins, ok to test |
Test build #71938 has finished for PR 16690 at commit
|
ce5216e
to
fe86511
Compare
Test build #72027 has finished for PR 16690 at commit
|
fe86511
to
3b7e17b
Compare
Test build #72057 has finished for PR 16690 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, since you're adding the blocking API, please add a deprecation annotation to askWithRetry
.
|
||
import org.apache.spark.{SparkConf, SparkException} | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.util.RpcUtils | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't add.
* @tparam T type of the reply message | ||
* @return the reply message from the corresponding [[RpcEndpoint]] | ||
*/ | ||
def askWithBlocking[T: ClassTag](message: Any): T = askWithBlocking(message, defaultAskTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
askWithBlocking
is a weird name. I'd use blockingAsk
, or askSync
.
try { | ||
val future = ask[T](message, timeout) | ||
val result = timeout.awaitResult(future) | ||
if (result == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not an error. It's perfectly legitimate to return null.
return result | ||
} catch { | ||
case NonFatal(e) => | ||
throw new SparkException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it better to just propagate the original exception? You can get the context from the stack trace.
… multiple times because of askWithRetry
3b7e17b
to
e7002c1
Compare
Test build #72107 has finished for PR 16690 at commit
|
@vanzin |
Please be a little more patient, especially during weekends. |
I feel very sorry if this is disturbing : ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just need to fix the versions.
@@ -91,6 +123,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) | |||
* @tparam T type of the reply message | |||
* @return the reply message from the corresponding [[RpcEndpoint]] | |||
*/ | |||
@deprecated("use 'askSync' instead.", "2.1.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2.2.0
@@ -75,10 +106,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) | |||
* @tparam T type of the reply message | |||
* @return the reply message from the corresponding [[RpcEndpoint]] | |||
*/ | |||
@deprecated("use 'askSync' instead.", "2.1.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2.2.0
@@ -19,6 +19,7 @@ package org.apache.spark.rpc | |||
|
|||
import scala.concurrent.Future | |||
import scala.reflect.ClassTag | |||
import scala.util.control.NonFatal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used
Test build #72228 has finished for PR 16690 at commit
|
@vanzin |
LGTM, merging to master. |
Thanks a lot for reviewing this PR~ |
## What changes were proposed in this pull request? The current code in `HeartbeatReceiverSuite`, executorId is set as below: ``` private val executorId1 = "executor-1" private val executorId2 = "executor-2" ``` The executorId is sent to driver when register as below: ``` test("expire dead hosts should kill executors with replacement (SPARK-8119)") { ... fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty)) ... } ``` Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below: ``` case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else { ... executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } ... ``` `executorId.toInt` will cause NumberformatException. This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true. **To fix** Rectify executorId and replace `askWithRetry` with `askSync`, refer to #16690 ## How was this patch tested? This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: jinxing <jinxing@meituan.com> Closes #16779 from jinxing64/SPARK-19437.
… multiple times because of askWithRetry. ## What changes were proposed in this pull request? `ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times. **To reproduce**: 1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent. 2. Rebuild Spark and run following job: ``` def streamProcessing(): Unit = { val conf = new SparkConf() .setAppName("StreamingTest") .setMaster(masterUrl) val ssc = new StreamingContext(conf, Seconds(200)) val stream = ssc.socketTextStream("localhost", 1234) stream.print() ssc.start() ssc.awaitTermination() } ``` **To fix**: It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (apache#16503 (comment)). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it. ## How was this patch tested? Test manually. The scenario described above doesn't happen with this patch. Author: jinxing <jinxing@meituan.com> Closes apache#16690 from jinxing64/SPARK-19347.
## What changes were proposed in this pull request? The current code in `HeartbeatReceiverSuite`, executorId is set as below: ``` private val executorId1 = "executor-1" private val executorId2 = "executor-2" ``` The executorId is sent to driver when register as below: ``` test("expire dead hosts should kill executors with replacement (SPARK-8119)") { ... fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty)) ... } ``` Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below: ``` case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else { ... executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } ... ``` `executorId.toInt` will cause NumberformatException. This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true. **To fix** Rectify executorId and replace `askWithRetry` with `askSync`, refer to apache#16690 ## How was this patch tested? This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: jinxing <jinxing@meituan.com> Closes apache#16779 from jinxing64/SPARK-19437.
@@ -75,10 +105,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) | |||
* @tparam T type of the reply message | |||
* @return the reply message from the corresponding [[RpcEndpoint]] | |||
*/ | |||
@deprecated("use 'askSync' instead.", "2.2.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this has caused the build to produce a lot of deprecation warnings. @jinxing64 could the callers of this method be changed, in Spark, to use the new alternative?
## What changes were proposed in this pull request? `askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and apache#16690 (comment)) and `askWithRetry` is marked as deprecated. As mentioned SPARK-18113(apache#16503 (comment)): >askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it. Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`. ## How was this patch tested? This PR doesn't change code logic, existing unit test can cover. Author: jinxing <jinxing@meituan.com> Closes apache#16790 from jinxing64/SPARK-19450.
## What changes were proposed in this pull request? `askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and apache#16690 (comment)) and `askWithRetry` is marked as deprecated. As mentioned SPARK-18113(apache#16503 (comment)): >askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it. Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`. ## How was this patch tested? This PR doesn't change code logic, existing unit test can cover. Author: jinxing <jinxing@meituan.com> Closes apache#16790 from jinxing64/SPARK-19450.
What changes were proposed in this pull request?
ReceiverSupervisorImpl
on executor side reports block's meta back toReceiverTracker
on driver side. In current code,askWithRetry
is used. However, forAddBlock
,ReceiverTracker
is not idempotent, which may result in messages are processed multiple times.To reproduce:
AddBlock
inReceiverTracker
, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout inaskWithRetry
, thenAddBlock
will be resent.To fix:
It makes sense to provide a blocking version
ask
in RpcEndpointRef, as mentioned in SPARK-18113 (#16503 (comment)). Because Netty RPC layer will not drop messages.askWithRetry
is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.How was this patch tested?
Test manually. The scenario described above doesn't happen with this patch.