Skip to content

Commit 00bfbd7

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-3829
2 parents dd87480 + 553737c commit 00bfbd7

File tree

17 files changed

+270
-156
lines changed

17 files changed

+270
-156
lines changed

bin/compute-classpath.cmd

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
3838
rem Build up classpath
3939
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%
4040

41-
if "x%SPARK_CONF_DIR%"!="x" (
41+
if not "x%SPARK_CONF_DIR%"=="x" (
4242
set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
4343
) else (
4444
set CLASSPATH=%CLASSPATH%;%FWDIR%conf

core/src/main/resources/org/apache/spark/ui/static/webui.css

+5
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ table.sortable thead {
5151
cursor: pointer;
5252
}
5353

54+
table.sortable td {
55+
word-wrap: break-word;
56+
max-width: 600px;
57+
}
58+
5459
.progress {
5560
margin-bottom: 0px; position: relative
5661
}

core/src/main/scala/org/apache/spark/CacheManager.scala

-2
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
168168
arr.iterator.asInstanceOf[Iterator[T]]
169169
case Right(it) =>
170170
// There is not enough space to cache this partition in memory
171-
logWarning(s"Not enough space to cache partition $key in memory! " +
172-
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
173171
val returnValues = it.asInstanceOf[Iterator[T]]
174172
if (putLevel.useDisk) {
175173
logWarning(s"Persisting partition $key to disk instead.")

core/src/main/scala/org/apache/spark/SparkEnv.scala

+8-11
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ import org.apache.spark.util.{AkkaUtils, Utils}
4343
* :: DeveloperApi ::
4444
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
4545
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
46-
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
47-
* objects needs to have the right SparkEnv set. You can get the current environment with
48-
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
46+
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
47+
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
4948
*
5049
* NOTE: This is not intended for external use. This is exposed for Shark and may be made private
5150
* in a future release.
@@ -119,30 +118,28 @@ class SparkEnv (
119118
}
120119

121120
object SparkEnv extends Logging {
122-
private val env = new ThreadLocal[SparkEnv]
123-
@volatile private var lastSetSparkEnv : SparkEnv = _
121+
@volatile private var env: SparkEnv = _
124122

125123
private[spark] val driverActorSystemName = "sparkDriver"
126124
private[spark] val executorActorSystemName = "sparkExecutor"
127125

128126
def set(e: SparkEnv) {
129-
lastSetSparkEnv = e
130-
env.set(e)
127+
env = e
131128
}
132129

133130
/**
134-
* Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
135-
* previously set in any thread.
131+
* Returns the SparkEnv.
136132
*/
137133
def get: SparkEnv = {
138-
Option(env.get()).getOrElse(lastSetSparkEnv)
134+
env
139135
}
140136

141137
/**
142138
* Returns the ThreadLocal SparkEnv.
143139
*/
140+
@deprecated("Use SparkEnv.get instead", "1.2")
144141
def getThreadLocal: SparkEnv = {
145-
env.get()
142+
env
146143
}
147144

148145
private[spark] def create(

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ private[spark] class PythonRDD(
196196

197197
override def run(): Unit = Utils.logUncaughtExceptions {
198198
try {
199-
SparkEnv.set(env)
200199
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
201200
val dataOut = new DataOutputStream(stream)
202201
// Partition index
@@ -248,6 +247,11 @@ private[spark] class PythonRDD(
248247
// will kill the whole executor (see org.apache.spark.executor.Executor).
249248
_exception = e
250249
worker.shutdownOutput()
250+
} finally {
251+
// Release memory used by this thread for shuffles
252+
env.shuffleMemoryManager.releaseMemoryForThisThread()
253+
// Release memory used by this thread for unrolling blocks
254+
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
251255
}
252256
}
253257
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

-2
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ private[spark] class Executor(
148148

149149
override def run() {
150150
val startTime = System.currentTimeMillis()
151-
SparkEnv.set(env)
152151
Thread.currentThread.setContextClassLoader(replClassLoader)
153152
val ser = SparkEnv.get.closureSerializer.newInstance()
154153
logInfo(s"Running $taskName (TID $taskId)")
@@ -158,7 +157,6 @@ private[spark] class Executor(
158157
val startGCTime = gcTime
159158

160159
try {
161-
SparkEnv.set(env)
162160
Accumulators.clear()
163161
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
164162
updateDependencies(taskFiles, taskJars)

core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala

-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ private[spark] class PipedRDD[T: ClassTag](
131131
// Start a thread to feed the process input from our parent's iterator
132132
new Thread("stdin writer for " + command) {
133133
override def run() {
134-
SparkEnv.set(env)
135134
val out = new PrintWriter(proc.getOutputStream)
136135

137136
// input the pipe context firstly

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

-1
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,6 @@ class DAGScheduler(
630630
protected def runLocallyWithinThread(job: ActiveJob) {
631631
var jobResult: JobResult = JobSucceeded
632632
try {
633-
SparkEnv.set(env)
634633
val rdd = job.finalStage.rdd
635634
val split = rdd.partitions(job.partitions(0))
636635
val taskContext =

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

-2
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,6 @@ private[spark] class TaskSchedulerImpl(
216216
* that tasks are balanced across the cluster.
217217
*/
218218
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
219-
SparkEnv.set(sc.env)
220-
221219
// Mark each slave as alive and remember its hostname
222220
// Also track if new executor is added
223221
var newExecAvail = false

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

+39-6
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
132132
PutResult(res.size, res.data, droppedBlocks)
133133
case Right(iteratorValues) =>
134134
// Not enough space to unroll this block; drop to disk if applicable
135-
logWarning(s"Not enough space to store block $blockId in memory! " +
136-
s"Free memory is $freeMemory bytes.")
137135
if (level.useDisk && allowPersistToDisk) {
138136
logWarning(s"Persisting block $blockId to disk instead.")
139137
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
@@ -265,6 +263,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
265263
Left(vector.toArray)
266264
} else {
267265
// We ran out of space while unrolling the values for this block
266+
logUnrollFailureMessage(blockId, vector.estimateSize())
268267
Right(vector.iterator ++ values)
269268
}
270269

@@ -424,7 +423,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
424423
* Reserve additional memory for unrolling blocks used by this thread.
425424
* Return whether the request is granted.
426425
*/
427-
private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
426+
def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
428427
accountingLock.synchronized {
429428
val granted = freeMemory > currentUnrollMemory + memory
430429
if (granted) {
@@ -439,7 +438,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
439438
* Release memory used by this thread for unrolling blocks.
440439
* If the amount is not specified, remove the current thread's allocation altogether.
441440
*/
442-
private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
441+
def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
443442
val threadId = Thread.currentThread().getId
444443
accountingLock.synchronized {
445444
if (memory < 0) {
@@ -457,16 +456,50 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
457456
/**
458457
* Return the amount of memory currently occupied for unrolling blocks across all threads.
459458
*/
460-
private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
459+
def currentUnrollMemory: Long = accountingLock.synchronized {
461460
unrollMemoryMap.values.sum
462461
}
463462

464463
/**
465464
* Return the amount of memory currently occupied for unrolling blocks by this thread.
466465
*/
467-
private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
466+
def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
468467
unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
469468
}
469+
470+
/**
471+
* Return the number of threads currently unrolling blocks.
472+
*/
473+
def numThreadsUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size }
474+
475+
/**
476+
* Log information about current memory usage.
477+
*/
478+
def logMemoryUsage(): Unit = {
479+
val blocksMemory = currentMemory
480+
val unrollMemory = currentUnrollMemory
481+
val totalMemory = blocksMemory + unrollMemory
482+
logInfo(
483+
s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " +
484+
s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " +
485+
s"$numThreadsUnrolling thread(s)) = ${Utils.bytesToString(totalMemory)}. " +
486+
s"Storage limit = ${Utils.bytesToString(maxMemory)}."
487+
)
488+
}
489+
490+
/**
491+
* Log a warning for failing to unroll a block.
492+
*
493+
* @param blockId ID of the block we are trying to unroll.
494+
* @param finalVectorSize Final size of the vector before unrolling failed.
495+
*/
496+
def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = {
497+
logWarning(
498+
s"Not enough space to cache $blockId in memory! " +
499+
s"(computed ${Utils.bytesToString(finalVectorSize)} so far)"
500+
)
501+
logMemoryUsage()
502+
}
470503
}
471504

472505
private[spark] case class ResultWithDroppedBlocks(

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala

-1
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
217217

218218
/** Generate jobs and perform checkpoint for the given `time`. */
219219
private def generateJobs(time: Time) {
220-
SparkEnv.set(ssc.env)
221220
Try(graph.generateJobs(time)) match {
222221
case Success(jobs) =>
223222
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
138138
}
139139
jobSet.handleJobStart(job)
140140
logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
141-
SparkEnv.set(ssc.env)
142141
}
143142

144143
private def handleJobCompletion(job: Job) {

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

-1
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
202202
@transient val thread = new Thread() {
203203
override def run() {
204204
try {
205-
SparkEnv.set(env)
206205
startReceivers()
207206
} catch {
208207
case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala

+16-10
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
4040
private var rpc: YarnRPC = null
4141
private var resourceManager: AMRMProtocol = _
4242
private var uiHistoryAddress: String = _
43+
private var registered: Boolean = false
4344

4445
override def register(
4546
conf: YarnConfiguration,
@@ -51,8 +52,11 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
5152
this.rpc = YarnRPC.create(conf)
5253
this.uiHistoryAddress = uiHistoryAddress
5354

54-
resourceManager = registerWithResourceManager(conf)
55-
registerApplicationMaster(uiAddress)
55+
synchronized {
56+
resourceManager = registerWithResourceManager(conf)
57+
registerApplicationMaster(uiAddress)
58+
registered = true
59+
}
5660

5761
new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
5862
preferredNodeLocations, securityMgr)
@@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
6670
appAttemptId
6771
}
6872

69-
override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
70-
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
71-
.asInstanceOf[FinishApplicationMasterRequest]
72-
finishReq.setAppAttemptId(getAttemptId())
73-
finishReq.setFinishApplicationStatus(status)
74-
finishReq.setDiagnostics(diagnostics)
75-
finishReq.setTrackingUrl(uiHistoryAddress)
76-
resourceManager.finishApplicationMaster(finishReq)
73+
override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
74+
if (registered) {
75+
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
76+
.asInstanceOf[FinishApplicationMasterRequest]
77+
finishReq.setAppAttemptId(getAttemptId())
78+
finishReq.setFinishApplicationStatus(status)
79+
finishReq.setDiagnostics(diagnostics)
80+
finishReq.setTrackingUrl(uiHistoryAddress)
81+
resourceManager.finishApplicationMaster(finishReq)
82+
}
7783
}
7884

7985
override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {

0 commit comments

Comments
 (0)