Skip to content

Commit e6ed13f

Browse files
committed
Merge pull request apache#397 from pwendell/host-port
Remove now un-needed hostPort option I noticed this was logging some scary error messages in various places. After I looked into it, this is no longer really used. I removed the option and re-wrote the one remaining use case (it was unnecessary there anyways).
2 parents 0b96d85 + 0bb3307 commit e6ed13f

File tree

16 files changed

+7
-52
lines changed

16 files changed

+7
-52
lines changed

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
3838
}
3939
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
4040
System.clearProperty("spark.driver.port")
41-
System.clearProperty("spark.hostPort")
4241
}
4342

4443
test("halting by voting") {

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,16 @@ import org.apache.spark.storage.BlockManagerId
3232
import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
3333

3434
private[spark] sealed trait MapOutputTrackerMessage
35-
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
35+
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
3636
extends MapOutputTrackerMessage
3737
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
3838

3939
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster)
4040
extends Actor with Logging {
4141
def receive = {
42-
case GetMapOutputStatuses(shuffleId: Int, requester: String) =>
43-
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester)
42+
case GetMapOutputStatuses(shuffleId: Int) =>
43+
val hostPort = sender.path.address.hostPort
44+
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
4445
sender ! tracker.getSerializedMapOutputStatuses(shuffleId)
4546

4647
case StopMapOutputTracker =>
@@ -119,11 +120,10 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
119120
if (fetchedStatuses == null) {
120121
// We won the race to fetch the output locs; do so
121122
logInfo("Doing the fetch; tracker actor = " + trackerActor)
122-
val hostPort = Utils.localHostPort(conf)
123123
// This try-finally prevents hangs due to timeouts:
124124
try {
125125
val fetchedBytes =
126-
askTracker(GetMapOutputStatuses(shuffleId, hostPort)).asInstanceOf[Array[Byte]]
126+
askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
127127
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
128128
logInfo("Got the output locations")
129129
mapStatuses.put(shuffleId, fetchedStatuses)

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,16 +132,6 @@ object SparkEnv extends Logging {
132132
conf.set("spark.driver.port", boundPort.toString)
133133
}
134134

135-
// set only if unset until now.
136-
if (!conf.contains("spark.hostPort")) {
137-
if (!isDriver){
138-
// unexpected
139-
Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
140-
}
141-
Utils.checkHost(hostname)
142-
conf.set("spark.hostPort", hostname + ":" + boundPort)
143-
}
144-
145135
val classLoader = Thread.currentThread.getContextClassLoader
146136

147137
// Create an instance of the class named by the given Java system property, or by

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ private[spark] object CoarseGrainedExecutorBackend {
103103
indestructible = true, conf = new SparkConf)
104104
// set it
105105
val sparkHostPort = hostname + ":" + boundPort
106-
// conf.set("spark.hostPort", sparkHostPort)
107106
actorSystem.actorOf(
108107
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
109108
name = "Executor")

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
165165
override def start() {
166166
val properties = new ArrayBuffer[(String, String)]
167167
for ((key, value) <- scheduler.sc.conf.getAll) {
168-
if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
168+
if (key.startsWith("spark.")) {
169169
properties += ((key, value))
170170
}
171171
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ private[spark] class BlockManager(
8383

8484
val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
8585

86-
val hostPort = Utils.localHostPort(conf)
87-
8886
val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
8987
name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
9088

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -420,15 +420,6 @@ private[spark] object Utils extends Logging {
420420
InetAddress.getByName(address).getHostName
421421
}
422422

423-
def localHostPort(conf: SparkConf): String = {
424-
val retval = conf.get("spark.hostPort", null)
425-
if (retval == null) {
426-
logErrorWithStack("spark.hostPort not set but invoking localHostPort")
427-
return localHostName()
428-
}
429-
retval
430-
}
431-
432423
def checkHost(host: String, message: String = "") {
433424
assert(host.indexOf(':') == -1, message)
434425
}
@@ -437,14 +428,6 @@ private[spark] object Utils extends Logging {
437428
assert(hostPort.indexOf(':') != -1, message)
438429
}
439430

440-
def logErrorWithStack(msg: String) {
441-
try {
442-
throw new Exception
443-
} catch {
444-
case ex: Exception => logError(msg, ex)
445-
}
446-
}
447-
448431
// Typically, this will be of order of number of nodes in cluster
449432
// If not, we should change it to LRUCache or something.
450433
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()

core/src/test/scala/org/apache/spark/LocalSparkContext.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ object LocalSparkContext {
5353
}
5454
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
5555
System.clearProperty("spark.driver.port")
56-
System.clearProperty("spark.hostPort")
5756
}
5857

5958
/** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
9999
val hostname = "localhost"
100100
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf)
101101
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
102-
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
103102

104103
val masterTracker = new MapOutputTrackerMaster(conf)
105104
masterTracker.trackerActor = actorSystem.actorOf(

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
5353
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf)
5454
this.actorSystem = actorSystem
5555
conf.set("spark.driver.port", boundPort.toString)
56-
conf.set("spark.hostPort", "localhost:" + boundPort)
5756

5857
master = new BlockManagerMaster(
5958
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
@@ -65,13 +64,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
6564
conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
6665
val initialize = PrivateMethod[Unit]('initialize)
6766
SizeEstimator invokePrivate initialize()
68-
// Set some value ...
69-
conf.set("spark.hostPort", Utils.localHostName() + ":" + 1111)
7067
}
7168

7269
after {
7370
System.clearProperty("spark.driver.port")
74-
System.clearProperty("spark.hostPort")
7571

7672
if (store != null) {
7773
store.stop()

0 commit comments

Comments
 (0)