Skip to content

Commit d502e5f

Browse files
committed
Handle port collisions when creating Akka systems
This requires us to handle exceptions thrown more carefully, because akka throws its own exceptions that are not java.net.BindException. We workaround this by traversing the Exception causality tree to find a java.net.BindException with an "Address already in use" message.
1 parent a2dd05c commit d502e5f

File tree

2 files changed

+35
-9
lines changed

2 files changed

+35
-9
lines changed

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.collection.JavaConversions.mapAsJavaMap
2121
import scala.concurrent.Await
2222
import scala.concurrent.duration.{Duration, FiniteDuration}
2323

24-
import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem}
24+
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
2525
import akka.pattern.ask
2626

2727
import com.typesafe.config.ConfigFactory
@@ -44,14 +44,28 @@ private[spark] object AkkaUtils extends Logging {
4444
* If indestructible is set to true, the Actor System will continue running in the event
4545
* of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
4646
*/
47-
def createActorSystem(name: String, host: String, port: Int,
48-
conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {
47+
def createActorSystem(
48+
name: String,
49+
host: String,
50+
port: Int,
51+
conf: SparkConf,
52+
securityManager: SecurityManager): (ActorSystem, Int) = {
53+
val startService: Int => (ActorSystem, Int) = { actualPort =>
54+
doCreateActorSystem(name, host, actualPort, conf, securityManager)
55+
}
56+
Utils.startServiceOnPort(port, startService, name)
57+
}
58+
59+
private def doCreateActorSystem(
60+
name: String,
61+
host: String,
62+
port: Int,
63+
conf: SparkConf,
64+
securityManager: SecurityManager): (ActorSystem, Int) = {
4965

5066
val akkaThreads = conf.getInt("spark.akka.threads", 4)
5167
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
52-
5368
val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
54-
5569
val akkaFrameSize = maxFrameSizeBytes(conf)
5670
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
5771
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,10 +1367,7 @@ private[spark] object Utils extends Logging {
13671367
logInfo(s"Successfully started service$serviceString on port $port.")
13681368
return (service, port)
13691369
} catch {
1370-
case e: BindException =>
1371-
if (!e.getMessage.contains("Address already in use")) {
1372-
throw e
1373-
}
1370+
case e: Exception if isBindCollision(e) =>
13741371
if (offset >= maxRetries) {
13751372
val exceptionMessage =
13761373
s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
@@ -1387,4 +1384,19 @@ private[spark] object Utils extends Logging {
13871384
throw new SparkException(s"Failed to start service on port $startPort")
13881385
}
13891386

1387+
/**
1388+
* Return whether the exception is caused by an address-port collision when binding.
1389+
*/
1390+
private def isBindCollision(exception: Throwable): Boolean = {
1391+
exception match {
1392+
case e: BindException =>
1393+
if (e.getMessage != null && e.getMessage.contains("Address already in use")) {
1394+
return true
1395+
}
1396+
isBindCollision(e.getCause)
1397+
case e: Exception => isBindCollision(e.getCause)
1398+
case _ => false
1399+
}
1400+
}
1401+
13901402
}

0 commit comments

Comments
 (0)