Skip to content

Commit 0347aef

Browse files
committed
Unify port fallback logic to a single place
1 parent 24a4c32 commit 0347aef

File tree

3 files changed

+69
-39
lines changed

3 files changed

+69
-39
lines changed

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import java.io.File
2121

22+
import org.apache.spark.network.PortManager
2223
import org.eclipse.jetty.util.security.{Constraint, Password}
2324
import org.eclipse.jetty.security.authentication.DigestAuthenticator
2425
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
@@ -47,7 +48,7 @@ private[spark] class HttpServer(resourceBase: File,
4748
private var server: Server = null
4849
private var port: Int = localPort
4950

50-
private def startOnPort(startPort: Int): Tuple2[Server,Int] = {
51+
private def startOnPort(startPort: Int): Server = {
5152
val server = new Server()
5253
val connector = new SocketConnector
5354
connector.setMaxIdleTime(60*1000)
@@ -78,34 +79,15 @@ private[spark] class HttpServer(resourceBase: File,
7879
server.start()
7980
val actualPort = server.getConnectors()(0).getLocalPort()
8081

81-
return (server, actualPort)
82-
}
83-
84-
private def startWithIncrements(startPort: Int, maxRetries: Int): Tuple2[Server,Int] = {
85-
for( offset <- 0 to maxRetries) {
86-
try {
87-
val (server, actualPort) = startOnPort(startPort + offset)
88-
return (server, actualPort)
89-
} catch {
90-
case e: java.net.BindException => {
91-
if (!e.getMessage.contains("Address already in use") ||
92-
offset == (maxRetries-1)) {
93-
throw e
94-
}
95-
logInfo("Could not bind on port: " + (startPort + offset))
96-
}
97-
case e: Exception => throw e
98-
}
99-
}
100-
return (null, -1)
82+
server
10183
}
10284

10385
def start() {
10486
if (server != null) {
10587
throw new ServerStateException("Server is already started")
10688
} else {
10789
logInfo("Starting HTTP Server")
108-
val (actualServer, actualPort) = startWithIncrements(localPort, 3)
90+
val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort)
10991
server = actualServer
11092
port = actualPort
11193
}

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -102,24 +102,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
102102
serverChannel.socket.setReuseAddress(true)
103103
serverChannel.socket.setReceiveBufferSize(256 * 1024)
104104

105-
def bindWithIncrement(port: Int, maxTries: Int = 3) {
106-
for( offset <- 0 until maxTries ) {
107-
try {
108-
serverChannel.socket.bind(new InetSocketAddress(port + offset))
109-
return
110-
} catch {
111-
case e: java.net.BindException => {
112-
if(!e.getMessage.contains("Address already in use") ||
113-
offset == maxTries) {
114-
throw e
115-
}
116-
logInfo("Could not bind on port: " + (port + offset))
117-
}
118-
case e: Exception => throw e
119-
}
120-
}
105+
private def startService(port: Int) = {
106+
serverChannel.socket.bind(new InetSocketAddress(port))
107+
serverChannel
121108
}
122-
bindWithIncrement(port, 3)
109+
PortManager.startWithIncrements(port, 3, startService)
123110
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
124111

125112
val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network
19+
20+
import java.net.InetSocketAddress
21+
22+
import org.apache.spark.{Logging, SparkException}
23+
import org.eclipse.jetty.server.Server
24+
25+
private[spark] object PortManager extends Logging
26+
{
27+
28+
/**
29+
* Start service on given port, or attempt to fall back to the n+1 port for a certain number of
30+
* retries
31+
*
32+
* @param startPort
33+
* @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4
34+
* total attempts, on ports n, n+1, n+2, and n+3
35+
* @param startService Function to start service on a given port. Expected to throw a java.net
36+
* .BindException if the port is already in use
37+
* @tparam T
38+
* @throws SparkException When unable to start service in the given number of attempts
39+
* @return
40+
*/
41+
def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => T):
42+
(T, Int) = {
43+
for( offset <- 0 to maxRetries) {
44+
val tryPort = startPort + offset
45+
try {
46+
val service: T = startService(tryPort)
47+
return (service, tryPort)
48+
} catch {
49+
case e: java.net.BindException => {
50+
if (!e.getMessage.contains("Address already in use") ||
51+
offset == (maxRetries-1)) {
52+
throw e
53+
}
54+
logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort+1))
55+
}
56+
case e: Exception => throw e
57+
}
58+
}
59+
throw new SparkException(s"Couldn't start service on port $startPort")
60+
}
61+
}

0 commit comments

Comments
 (0)