Skip to content

Commit 7bcecc2

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into codereview
2 parents 4fad75a + 288ce58 commit 7bcecc2

File tree

29 files changed

+483
-243
lines changed

29 files changed

+483
-243
lines changed

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ private[deploy] object DeployMessages {
9292

9393
case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
9494

95+
case object ReregisterWithMaster // used when a worker attempts to reconnect to a master
96+
9597
// AppClient to Master
9698

9799
case class RegisterApplication(appDescription: ApplicationDescription)

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,27 @@ import org.apache.spark.scheduler._
2929
import org.apache.spark.ui.SparkUI
3030
import org.apache.spark.util.Utils
3131

32+
/**
33+
* A class that provides application history from event logs stored in the file system.
34+
* This provider checks for new finished applications in the background periodically and
35+
* renders the history application UI by parsing the associated event logs.
36+
*/
3237
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
3338
with Logging {
3439

40+
import FsHistoryProvider._
41+
3542
private val NOT_STARTED = "<Not Started>"
3643

3744
// Interval between each check for event log updates
3845
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
3946
conf.getInt("spark.history.updateInterval", 10)) * 1000
4047

41-
private val logDir = conf.get("spark.history.fs.logDirectory", null)
42-
private val resolvedLogDir = Option(logDir)
43-
.map { d => Utils.resolveURI(d) }
44-
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }
48+
private val logDir = conf.getOption("spark.history.fs.logDirectory")
49+
.map { d => Utils.resolveURI(d).toString }
50+
.getOrElse(DEFAULT_LOG_DIR)
4551

46-
private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
47-
SparkHadoopUtil.get.newConfiguration(conf))
52+
private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
4853

4954
// A timestamp of when the disk was last accessed to check for log updates
5055
private var lastLogCheckTimeMs = -1L
@@ -87,14 +92,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
8792

8893
private def initialize() {
8994
// Validate the log directory.
90-
val path = new Path(resolvedLogDir)
95+
val path = new Path(logDir)
9196
if (!fs.exists(path)) {
92-
throw new IllegalArgumentException(
93-
"Logging directory specified does not exist: %s".format(resolvedLogDir))
97+
var msg = s"Log directory specified does not exist: $logDir."
98+
if (logDir == DEFAULT_LOG_DIR) {
99+
msg += " Did you configure the correct one through spark.fs.history.logDirectory?"
100+
}
101+
throw new IllegalArgumentException(msg)
94102
}
95103
if (!fs.getFileStatus(path).isDir) {
96104
throw new IllegalArgumentException(
97-
"Logging directory specified is not a directory: %s".format(resolvedLogDir))
105+
"Logging directory specified is not a directory: %s".format(logDir))
98106
}
99107

100108
checkForLogs()
@@ -134,8 +142,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
134142
}
135143
}
136144

137-
override def getConfig(): Map[String, String] =
138-
Map("Event Log Location" -> resolvedLogDir.toString)
145+
override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString)
139146

140147
/**
141148
* Builds the application list based on the current contents of the log directory.
@@ -146,7 +153,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
146153
lastLogCheckTimeMs = getMonotonicTimeMs()
147154
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
148155
try {
149-
val logStatus = fs.listStatus(new Path(resolvedLogDir))
156+
val logStatus = fs.listStatus(new Path(logDir))
150157
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
151158

152159
// Load all new logs from the log directory. Only directories that have a modification time
@@ -244,6 +251,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
244251

245252
}
246253

254+
private object FsHistoryProvider {
255+
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
256+
}
257+
247258
private class FsApplicationHistoryInfo(
248259
val logDir: String,
249260
id: String,

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,13 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
5858
</h4> ++
5959
appTable
6060
} else {
61-
<h4>No Completed Applications Found</h4>
61+
<h4>No completed applications found!</h4> ++
62+
<p>Did you specify the correct logging directory?
63+
Please verify your setting of <span style="font-style:italic">
64+
spark.history.fs.logDirectory</span> and whether you have the permissions to
65+
access it.<br /> It is also possible that your application did not run to
66+
completion or did not stop the SparkContext.
67+
</p>
6268
}
6369
}
6470
</div>

core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,22 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import org.apache.spark.SparkConf
20+
import org.apache.spark.{Logging, SparkConf}
2121
import org.apache.spark.util.Utils
2222

2323
/**
2424
* Command-line parser for the master.
2525
*/
26-
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
27-
private var logDir: String = null
26+
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) extends Logging {
2827
private var propertiesFile: String = null
2928

3029
parse(args.toList)
3130

3231
private def parse(args: List[String]): Unit = {
3332
args match {
3433
case ("--dir" | "-d") :: value :: tail =>
35-
logDir = value
34+
logWarning("Setting log directory through the command line is deprecated as of " +
35+
"Spark 1.1.0. Please set this through spark.history.fs.logDirectory instead.")
3636
conf.set("spark.history.fs.logDirectory", value)
3737
System.setProperty("spark.history.fs.logDirectory", value)
3838
parse(tail)
@@ -78,9 +78,10 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
7878
| (default 50)
7979
|FsHistoryProvider options:
8080
|
81-
| spark.history.fs.logDirectory Directory where app logs are stored (required)
82-
| spark.history.fs.updateInterval How often to reload log data from storage (in seconds,
83-
| default 10)
81+
| spark.history.fs.logDirectory Directory where app logs are stored
82+
| (default: file:/tmp/spark-events)
83+
| spark.history.fs.updateInterval How often to reload log data from storage
84+
| (in seconds, default: 10)
8485
|""".stripMargin)
8586
System.exit(exitCode)
8687
}

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.File
2121
import java.io.IOException
2222
import java.text.SimpleDateFormat
2323
import java.util.{UUID, Date}
24-
import java.util.concurrent.TimeUnit
2524

2625
import scala.collection.JavaConversions._
2726
import scala.collection.mutable.HashMap
@@ -177,6 +176,9 @@ private[spark] class Worker(
177176
throw new SparkException("Invalid spark URL: " + x)
178177
}
179178
connected = true
179+
// Cancel any outstanding re-registration attempts because we found a new master
180+
registrationRetryTimer.foreach(_.cancel())
181+
registrationRetryTimer = None
180182
}
181183

182184
private def tryRegisterAllMasters() {
@@ -187,20 +189,53 @@ private[spark] class Worker(
187189
}
188190
}
189191

190-
private def retryConnectToMaster() {
192+
/**
193+
* Re-register with the master because a network failure or a master failure has occurred.
194+
* If the re-registration attempt threshold is exceeded, the worker exits with error.
195+
* Note that for thread-safety this should only be called from the actor.
196+
*/
197+
private def reregisterWithMaster(): Unit = {
191198
Utils.tryOrExit {
192199
connectionAttemptCount += 1
193200
if (registered) {
194201
registrationRetryTimer.foreach(_.cancel())
195202
registrationRetryTimer = None
196203
} else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
197204
logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
198-
tryRegisterAllMasters()
205+
/**
206+
* Re-register with the active master this worker has been communicating with. If there
207+
* is none, then it means this worker is still bootstrapping and hasn't established a
208+
* connection with a master yet, in which case we should re-register with all masters.
209+
*
210+
* It is important to re-register only with the active master during failures. Otherwise,
211+
* if the worker unconditionally attempts to re-register with all masters, the following
212+
* race condition may arise and cause a "duplicate worker" error detailed in SPARK-4592:
213+
*
214+
* (1) Master A fails and Worker attempts to reconnect to all masters
215+
* (2) Master B takes over and notifies Worker
216+
* (3) Worker responds by registering with Master B
217+
* (4) Meanwhile, Worker's previous reconnection attempt reaches Master B,
218+
* causing the same Worker to register with Master B twice
219+
*
220+
* Instead, if we only register with the known active master, we can assume that the
221+
* old master must have died because another master has taken over. Note that this is
222+
* still not safe if the old master recovers within this interval, but this is a much
223+
* less likely scenario.
224+
*/
225+
if (master != null) {
226+
master ! RegisterWorker(
227+
workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
228+
} else {
229+
// We are retrying the initial registration
230+
tryRegisterAllMasters()
231+
}
232+
// We have exceeded the initial registration retry threshold
233+
// All retries from now on should use a higher interval
199234
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
200235
registrationRetryTimer.foreach(_.cancel())
201236
registrationRetryTimer = Some {
202237
context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL,
203-
PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
238+
PROLONGED_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
204239
}
205240
}
206241
} else {
@@ -220,7 +255,7 @@ private[spark] class Worker(
220255
connectionAttemptCount = 0
221256
registrationRetryTimer = Some {
222257
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
223-
INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
258+
INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
224259
}
225260
case Some(_) =>
226261
logInfo("Not spawning another attempt to register with the master, since there is an" +
@@ -400,12 +435,15 @@ private[spark] class Worker(
400435
logInfo(s"$x Disassociated !")
401436
masterDisconnected()
402437

403-
case RequestWorkerState => {
438+
case RequestWorkerState =>
404439
sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
405440
finishedExecutors.values.toList, drivers.values.toList,
406441
finishedDrivers.values.toList, activeMasterUrl, cores, memory,
407442
coresUsed, memoryUsed, activeMasterWebUiUrl)
408-
}
443+
444+
case ReregisterWithMaster =>
445+
reregisterWithMaster()
446+
409447
}
410448

411449
private def masterDisconnected() {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ private[spark] class Executor(
334334
* SparkContext. Also adds any new JARs we fetched to the class loader.
335335
*/
336336
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
337-
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
337+
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
338338
synchronized {
339339
// Fetch missing dependencies
340340
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {

core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,24 @@ package org.apache.spark.network.netty
2020
import org.apache.spark.SparkConf
2121
import org.apache.spark.network.util.{TransportConf, ConfigProvider}
2222

23+
/**
24+
* Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor,
25+
* Driver, or a standalone shuffle service) into a TransportConf with details on our environment
26+
* like the number of cores that are allocated to this JVM.
27+
*/
2328
object SparkTransportConf {
29+
/**
30+
* Specifies an upper bound on the number of Netty threads that Spark requires by default.
31+
* In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
32+
* that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
33+
* at a premium.
34+
*
35+
* Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
36+
* allocation. It can be overridden by setting the number of serverThreads and clientThreads
37+
* manually in Spark's configuration.
38+
*/
39+
private val MAX_DEFAULT_NETTY_THREADS = 8
40+
2441
/**
2542
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
2643
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
@@ -29,15 +46,28 @@ object SparkTransportConf {
2946
*/
3047
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
3148
val conf = _conf.clone
32-
if (numUsableCores > 0) {
33-
// Only set if serverThreads/clientThreads not already set.
34-
conf.set("spark.shuffle.io.serverThreads",
35-
conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
36-
conf.set("spark.shuffle.io.clientThreads",
37-
conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
38-
}
49+
50+
// Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
51+
// assuming we have all the machine's cores).
52+
// NB: Only set if serverThreads/clientThreads not already set.
53+
val numThreads = defaultNumThreads(numUsableCores)
54+
conf.set("spark.shuffle.io.serverThreads",
55+
conf.get("spark.shuffle.io.serverThreads", numThreads.toString))
56+
conf.set("spark.shuffle.io.clientThreads",
57+
conf.get("spark.shuffle.io.clientThreads", numThreads.toString))
58+
3959
new TransportConf(new ConfigProvider {
4060
override def get(name: String): String = conf.get(name)
4161
})
4262
}
63+
64+
/**
65+
* Returns the default number of threads for both the Netty client and server thread pools.
66+
* If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
67+
*/
68+
private def defaultNumThreads(numUsableCores: Int): Int = {
69+
val availableCores =
70+
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
71+
math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
72+
}
4373
}

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,10 @@ private[spark] object JsonProtocol {
688688
}
689689

690690
def blockManagerIdFromJson(json: JValue): BlockManagerId = {
691+
// On metadata fetch fail, block manager ID can be null (SPARK-4471)
692+
if (json == JNothing) {
693+
return null
694+
}
691695
val executorId = (json \ "Executor ID").extract[String]
692696
val host = (json \ "Host").extract[String]
693697
val port = (json \ "Port").extract[Int]

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.util
1919

2020
import java.util.Properties
2121

22+
import org.apache.spark.shuffle.MetadataFetchFailedException
23+
2224
import scala.collection.Map
2325

2426
import org.json4s.jackson.JsonMethods._
@@ -116,10 +118,13 @@ class JsonProtocolSuite extends FunSuite {
116118
// TaskEndReason
117119
val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
118120
"Some exception")
121+
val fetchMetadataFailed = new MetadataFetchFailedException(17,
122+
19, "metadata Fetch failed exception").toTaskEndReason
119123
val exceptionFailure = new ExceptionFailure(exception, None)
120124
testTaskEndReason(Success)
121125
testTaskEndReason(Resubmitted)
122126
testTaskEndReason(fetchFailed)
127+
testTaskEndReason(fetchMetadataFailed)
123128
testTaskEndReason(exceptionFailure)
124129
testTaskEndReason(TaskResultLost)
125130
testTaskEndReason(TaskKilled)
@@ -431,9 +436,13 @@ class JsonProtocolSuite extends FunSuite {
431436
}
432437

433438
private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) {
434-
assert(bm1.executorId === bm2.executorId)
435-
assert(bm1.host === bm2.host)
436-
assert(bm1.port === bm2.port)
439+
if (bm1 == null || bm2 == null) {
440+
assert(bm1 === bm2)
441+
} else {
442+
assert(bm1.executorId === bm2.executorId)
443+
assert(bm1.host === bm2.host)
444+
assert(bm1.port === bm2.port)
445+
}
437446
}
438447

439448
private def assertEquals(result1: JobResult, result2: JobResult) {

dev/create-release/create-release.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ make_binary_release "hadoop2.3" "-Phadoop-2.3 -Phive -Phive-thriftserver -Pyarn"
197197
make_binary_release "hadoop2.4" "-Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn" &
198198
make_binary_release "mapr3" "-Pmapr3 -Phive -Phive-thriftserver" &
199199
make_binary_release "mapr4" "-Pmapr4 -Pyarn -Phive -Phive-thriftserver" &
200+
make_binary_release "hadoop2.4-without-hive" "-Phadoop-2.4 -Pyarn" &
200201
wait
201202

202203
# Copy data

0 commit comments

Comments
 (0)