Skip to content

Commit cb1852d

Browse files
committed
Merge pull request #2 from apache/master
merge lastest spark
2 parents c3f046f + 7ac072f commit cb1852d

File tree

127 files changed

+4119
-1199
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

127 files changed

+4119
-1199
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
105105
cleaningThread.start()
106106
}
107107

108-
/** Stop the cleaner. */
108+
/**
109+
* Stop the cleaning thread and wait until the thread has finished running its current task.
110+
*/
109111
def stop() {
110112
stopped = true
113+
// Interrupt the cleaning thread, but wait until the current task has finished before
114+
// doing so. This guards against the race condition where a cleaning thread may
115+
// potentially clean similarly named variables created by a different SparkContext,
116+
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
117+
synchronized {
118+
cleaningThread.interrupt()
119+
}
120+
cleaningThread.join()
111121
}
112122

113123
/** Register a RDD for cleanup when it is garbage collected. */
@@ -140,21 +150,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
140150
try {
141151
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
142152
.map(_.asInstanceOf[CleanupTaskWeakReference])
143-
reference.map(_.task).foreach { task =>
144-
logDebug("Got cleaning task " + task)
145-
referenceBuffer -= reference.get
146-
task match {
147-
case CleanRDD(rddId) =>
148-
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
149-
case CleanShuffle(shuffleId) =>
150-
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
151-
case CleanBroadcast(broadcastId) =>
152-
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
153-
case CleanAccum(accId) =>
154-
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
153+
// Synchronize here to avoid being interrupted on stop()
154+
synchronized {
155+
reference.map(_.task).foreach { task =>
156+
logDebug("Got cleaning task " + task)
157+
referenceBuffer -= reference.get
158+
task match {
159+
case CleanRDD(rddId) =>
160+
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
161+
case CleanShuffle(shuffleId) =>
162+
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
163+
case CleanBroadcast(broadcastId) =>
164+
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
165+
case CleanAccum(accId) =>
166+
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
167+
}
155168
}
156169
}
157170
} catch {
171+
case ie: InterruptedException if stopped => // ignore
158172
case e: Exception => logError("Error in cleaning thread", e)
159173
}
160174
}
@@ -188,10 +202,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
188202
/** Perform broadcast cleanup. */
189203
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
190204
try {
191-
logDebug("Cleaning broadcast " + broadcastId)
205+
logDebug(s"Cleaning broadcast $broadcastId")
192206
broadcastManager.unbroadcast(broadcastId, true, blocking)
193207
listeners.foreach(_.broadcastCleaned(broadcastId))
194-
logInfo("Cleaned broadcast " + broadcastId)
208+
logDebug(s"Cleaned broadcast $broadcastId")
195209
} catch {
196210
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
197211
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
6868
if (value == null) {
6969
throw new NullPointerException("null value for " + key)
7070
}
71-
settings.put(translateConfKey(key, warn = true), value)
71+
settings.put(key, value)
7272
this
7373
}
7474

@@ -140,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
140140

141141
/** Set a parameter if it isn't already configured */
142142
def setIfMissing(key: String, value: String): SparkConf = {
143-
settings.putIfAbsent(translateConfKey(key, warn = true), value)
143+
settings.putIfAbsent(key, value)
144144
this
145145
}
146146

@@ -176,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
176176

177177
/** Get a parameter as an Option */
178178
def getOption(key: String): Option[String] = {
179-
Option(settings.get(translateConfKey(key)))
179+
Option(settings.get(key))
180180
}
181181

182182
/** Get all parameters as a list of pairs */
@@ -229,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
229229
def getAppId: String = get("spark.app.id")
230230

231231
/** Does the configuration contain a given parameter? */
232-
def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))
232+
def contains(key: String): Boolean = settings.containsKey(key)
233233

234234
/** Copy this object */
235235
override def clone: SparkConf = {
@@ -343,6 +343,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
343343
}
344344
}
345345
}
346+
347+
// Warn against the use of deprecated configs
348+
deprecatedConfigs.values.foreach { dc =>
349+
if (contains(dc.oldName)) {
350+
dc.warn()
351+
}
352+
}
346353
}
347354

348355
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
5151
import org.apache.spark.executor.TriggerThreadDump
5252
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
5353
FixedLengthBinaryInputFormat}
54+
import org.apache.spark.io.CompressionCodec
5455
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5556
import org.apache.spark.rdd._
5657
import org.apache.spark.scheduler._
@@ -233,6 +234,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
233234
None
234235
}
235236
}
237+
private[spark] val eventLogCodec: Option[String] = {
238+
val compress = conf.getBoolean("spark.eventLog.compress", false)
239+
if (compress && isEventLogEnabled) {
240+
Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
241+
} else {
242+
None
243+
}
244+
}
236245

237246
// Generate the random name for a temp folder in Tachyon
238247
// Add a timestamp as the suffix here to make it more safe
@@ -1383,10 +1392,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
13831392
/** Shut down the SparkContext. */
13841393
def stop() {
13851394
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
1386-
postApplicationEnd()
1387-
ui.foreach(_.stop())
13881395
if (!stopped) {
13891396
stopped = true
1397+
postApplicationEnd()
1398+
ui.foreach(_.stop())
13901399
env.metricsSystem.report()
13911400
metadataCleaner.cancel()
13921401
cleaner.foreach(_.stop())

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ private[spark] class ApplicationDescription(
2323
val memoryPerSlave: Int,
2424
val command: Command,
2525
var appUiUrl: String,
26-
val eventLogDir: Option[String] = None)
26+
val eventLogDir: Option[String] = None,
27+
// short name of compression codec used when writing event logs, if any (e.g. lzf)
28+
val eventLogCodec: Option[String] = None)
2729
extends Serializable {
2830

2931
val user = System.getProperty("user.name", "<unknown>")
@@ -34,8 +36,10 @@ private[spark] class ApplicationDescription(
3436
memoryPerSlave: Int = memoryPerSlave,
3537
command: Command = command,
3638
appUiUrl: String = appUiUrl,
37-
eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
38-
new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
39+
eventLogDir: Option[String] = eventLogDir,
40+
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
41+
new ApplicationDescription(
42+
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
3943

4044
override def toString: String = "ApplicationDescription(" + name + ")"
4145
}

0 commit comments

Comments
 (0)