[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered#3647
[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered#3647ilayaperumalg wants to merge 2 commits intoapache:masterfrom
Conversation
Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`.
|
Can one of the admins verify this patch? |
|
This is a good catch, since it looks like the original intent was that we'd remove In fact, a much older version of this code actually had the remove call: That file got renamed, but the It looks like the bug was actually introduced in #540, possibly as a copy-paste error when copying repeated code between |
There was a problem hiding this comment.
It seems a little suspicious to me that we assign to receiverInfo on this line only to remove the value that we set two lines later. I think it would be clearer to remove this line and change the next line to read
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))instead.
There was a problem hiding this comment.
The only difference with the above approach is that, the receiverInfo (at the ReceiverTracker) is not up-to-date (if we remove this line) at least with what is being sent to the StreamingListenerBus. Does the below approach make sense?
receiverInfo -= streamId
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
There was a problem hiding this comment.
Yeah, that's essentially what I was proposing. I was suggesting
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
receiverInfo.remove(streamId)but I don't think there's going to be a huge different between this and your suggestion unless there are multi-threading concerns. Feel free to update this with your suggestion.
|
Jenkins, this is ok to test. |
|
Test build #24507 has started for PR 3647 at commit
|
|
Test build #24507 has finished for PR 3647 at commit
|
|
Test PASSed. |
|
Test build #24513 has started for PR 3647 at commit
|
|
@JoshRosen updated the PR. Please see my observation and comments there: https://issues.apache.org/jira/browse/SPARK-2892. There seems to be an issue where the I don't see this issue when running in |
|
Test build #24513 has finished for PR 3647 at commit
|
|
Test PASSed. |
|
This patch looks good to me, so I'm going to merge it into @ilayaperumalg Thanks for the pointer to that other JIRA. Let's keep investigating https://issues.apache.org/jira/browse/SPARK-2892; we can continue discussion on JIRA. |
|
Actually, one potential concern before I merge this: if the old never removed entries from the |
|
Hmm, so it looks like /** Report error sent by a receiver */
private def reportError(streamId: Int, message: String, error: String) {
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
oldInfo.copy(lastErrorMessage = message, lastError = error)
case None =>
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
}
receiverInfo(streamId) = newReceiverInfo
listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {
s"$message"
}
logWarning(s"Error reported by receiver for stream $streamId: $messageWithError")
}This means that we'll leak |
|
@JoshRosen yeah, I too believe it is very unlikely. Upon deregistration of the corresponding receiver, both the |
|
LGTM. Merging this. |
…stered Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`. Author: Ilayaperumal Gopinathan <igopinathan@pivotal.io> Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits: 6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review 3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered (cherry picked from commit 10d69e9) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
…stered Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`. Author: Ilayaperumal Gopinathan <igopinathan@pivotal.io> Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits: 6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review 3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered (cherry picked from commit 10d69e9) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com> Conflicts: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
Once the streaming receiver is de-registered at executor, the
ReceiverTrackerActorneeds toremove the corresponding reveiverInfo from the
receiverInfomap atReceiverTracker.