Skip to content

Commit d7e3bfd

Browse files
committed
[SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message
Author: Shixiong Zhu <shixiong@databricks.com> Closes #10261 from zsxwing/SPARK-12267. (cherry picked from commit 8af2f8c) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
1 parent e05364b commit d7e3bfd

File tree

4 files changed

+65
-1
lines changed

4 files changed

+65
-1
lines changed

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ private[spark] class ApplicationInfo(
6666
nextExecutorId = 0
6767
removedExecutors = new ArrayBuffer[ExecutorDesc]
6868
executorLimit = Integer.MAX_VALUE
69+
appUIUrlAtHistoryServer = None
6970
}
7071

7172
private def newExecutorId(useID: Option[Int] = None): Int = {

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging {
690690
val conf = new SparkConf
691691
val args = new WorkerArguments(argStrings, conf)
692692
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
693-
args.memory, args.masters, args.workDir)
693+
args.memory, args.masters, args.workDir, conf = conf)
694694
rpcEnv.awaitTermination()
695695
}
696696

core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,9 @@ private[netty] class NettyRpcHandler(
560560
// A variable to track whether we should dispatch the RemoteProcessConnected message.
561561
private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
562562

563+
// A variable to track the remote RpcEnv addresses of all clients
564+
private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
565+
563566
override def receive(
564567
client: TransportClient,
565568
message: ByteBuffer,
@@ -587,6 +590,12 @@ private[netty] class NettyRpcHandler(
587590
// Create a new message with the socket address of the client as the sender.
588591
RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content)
589592
} else {
593+
// The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for
594+
// the listening address
595+
val remoteEnvAddress = requestMessage.senderAddress
596+
if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) {
597+
dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress))
598+
}
590599
requestMessage
591600
}
592601
}
@@ -598,6 +607,12 @@ private[netty] class NettyRpcHandler(
598607
if (addr != null) {
599608
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
600609
dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
610+
// If the remove RpcEnv listens to some address, we should also fire a
611+
// RemoteProcessConnectionError for the remote RpcEnv listening address
612+
val remoteEnvAddress = remoteAddresses.get(clientAddr)
613+
if (remoteEnvAddress != null) {
614+
dispatcher.postToAll(RemoteProcessConnectionError(cause, remoteEnvAddress))
615+
}
601616
} else {
602617
// If the channel is closed before connecting, its remoteAddress will be null.
603618
// See java.net.Socket.getRemoteSocketAddress
@@ -613,6 +628,12 @@ private[netty] class NettyRpcHandler(
613628
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
614629
nettyEnv.removeOutbox(clientAddr)
615630
dispatcher.postToAll(RemoteProcessDisconnected(clientAddr))
631+
val remoteEnvAddress = remoteAddresses.remove(clientAddr)
632+
// If the remove RpcEnv listens to some address, we should also fire a
633+
// RemoteProcessDisconnected for the remote RpcEnv listening address
634+
if (remoteEnvAddress != null) {
635+
dispatcher.postToAll(RemoteProcessDisconnected(remoteEnvAddress))
636+
}
616637
} else {
617638
// If the channel is closed before connecting, its remoteAddress will be null. In this case,
618639
// we can ignore it since we don't fire "Associated".

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,48 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
545545
}
546546
}
547547

548+
test("network events between non-client-mode RpcEnvs") {
549+
val events = new mutable.ArrayBuffer[(Any, Any)] with mutable.SynchronizedBuffer[(Any, Any)]
550+
env.setupEndpoint("network-events-non-client", new ThreadSafeRpcEndpoint {
551+
override val rpcEnv = env
552+
553+
override def receive: PartialFunction[Any, Unit] = {
554+
case "hello" =>
555+
case m => events += "receive" -> m
556+
}
557+
558+
override def onConnected(remoteAddress: RpcAddress): Unit = {
559+
events += "onConnected" -> remoteAddress
560+
}
561+
562+
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
563+
events += "onDisconnected" -> remoteAddress
564+
}
565+
566+
override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
567+
events += "onNetworkError" -> remoteAddress
568+
}
569+
570+
})
571+
572+
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = false)
573+
// Use anotherEnv to find out the RpcEndpointRef
574+
val rpcEndpointRef = anotherEnv.setupEndpointRef(
575+
"local", env.address, "network-events-non-client")
576+
val remoteAddress = anotherEnv.address
577+
rpcEndpointRef.send("hello")
578+
eventually(timeout(5 seconds), interval(5 millis)) {
579+
assert(events.contains(("onConnected", remoteAddress)))
580+
}
581+
582+
anotherEnv.shutdown()
583+
anotherEnv.awaitTermination()
584+
eventually(timeout(5 seconds), interval(5 millis)) {
585+
assert(events.contains(("onConnected", remoteAddress)))
586+
assert(events.contains(("onDisconnected", remoteAddress)))
587+
}
588+
}
589+
548590
test("sendWithReply: unserializable error") {
549591
env.setupEndpoint("sendWithReply-unserializable-error", new RpcEndpoint {
550592
override val rpcEnv = env

0 commit comments

Comments
 (0)