Skip to content

Commit 802261c

Browse files
committed
Merge pull request apache#7 from apache/master
merge lastest spark
2 parents d00303b + e87bf37 commit 802261c

File tree

149 files changed

+2760
-1367
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

149 files changed

+2760
-1367
lines changed

core/src/main/scala/org/apache/spark/SparkStatusTracker.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
4545
*/
4646
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
4747
jobProgressListener.synchronized {
48-
val jobData = jobProgressListener.jobIdToData.valuesIterator
49-
jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
48+
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
5049
}
5150
}
5251

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

+2
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ private[deploy] object DeployMessages {
101101
case class RegisterApplication(appDescription: ApplicationDescription)
102102
extends DeployMessage
103103

104+
case class UnregisterApplication(appId: String)
105+
104106
case class MasterChangeAcknowledged(appId: String)
105107

106108
// Master to AppClient

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

+1
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ private[spark] class AppClient(
157157

158158
case StopAppClient =>
159159
markDead("Application has been stopped.")
160+
master ! UnregisterApplication(appId)
160161
sender ! true
161162
context.stop(self)
162163
}

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private[deploy] class ApplicationInfo(
9191
}
9292
}
9393

94-
private[master] val requestedCores = desc.maxCores.getOrElse(defaultCores)
94+
private val requestedCores = desc.maxCores.getOrElse(defaultCores)
9595

9696
private[master] def coresLeft: Int = requestedCores - coresGranted
9797

@@ -111,6 +111,10 @@ private[deploy] class ApplicationInfo(
111111
endTime = System.currentTimeMillis()
112112
}
113113

114+
private[master] def isFinished: Boolean = {
115+
state != ApplicationState.WAITING && state != ApplicationState.RUNNING
116+
}
117+
114118
def duration: Long = {
115119
if (endTime != -1) {
116120
endTime - startTime

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,11 @@ private[master] class Master(
339339
if (ExecutorState.isFinished(state)) {
340340
// Remove this executor from the worker and app
341341
logInfo(s"Removing executor ${exec.fullId} because it is $state")
342-
appInfo.removeExecutor(exec)
342+
// If an application has already finished, preserve its
343+
// state to display its information properly on the UI
344+
if (!appInfo.isFinished) {
345+
appInfo.removeExecutor(exec)
346+
}
343347
exec.worker.removeExecutor(exec)
344348

345349
val normalExit = exitStatus == Some(0)
@@ -428,6 +432,10 @@ private[master] class Master(
428432
if (canCompleteRecovery) { completeRecovery() }
429433
}
430434

435+
case UnregisterApplication(applicationId) =>
436+
logInfo(s"Received unregister request from application $applicationId")
437+
idToApp.get(applicationId).foreach(finishApplication)
438+
431439
case DisassociatedEvent(_, address, _) => {
432440
// The disconnected client could've been either a worker or an app; remove whichever it was
433441
logInfo(s"$address got disassociated, removing it.")

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

+6-26
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,12 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
7575
val workers = state.workers.sortBy(_.id)
7676
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
7777

78-
val activeAppHeaders = Seq("Application ID", "Name", "Cores in Use",
79-
"Cores Requested", "Memory per Node", "Submitted Time", "User", "State", "Duration")
78+
val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
79+
"User", "State", "Duration")
8080
val activeApps = state.activeApps.sortBy(_.startTime).reverse
81-
val activeAppsTable = UIUtils.listingTable(activeAppHeaders, activeAppRow, activeApps)
82-
83-
val completedAppHeaders = Seq("Application ID", "Name", "Cores Requested", "Memory per Node",
84-
"Submitted Time", "User", "State", "Duration")
81+
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
8582
val completedApps = state.completedApps.sortBy(_.endTime).reverse
86-
val completedAppsTable = UIUtils.listingTable(completedAppHeaders, completeAppRow,
87-
completedApps)
83+
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
8884

8985
val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
9086
"Memory", "Main Class")
@@ -191,7 +187,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
191187
</tr>
192188
}
193189

194-
private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
190+
private def appRow(app: ApplicationInfo): Seq[Node] = {
195191
val killLink = if (parent.killEnabled &&
196192
(app.state == ApplicationState.RUNNING || app.state == ApplicationState.WAITING)) {
197193
val killLinkUri = s"app/kill?id=${app.id}&terminate=true"
@@ -201,7 +197,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
201197
(<a href={killLinkUri} onclick={confirm}>kill</a>)
202198
</span>
203199
}
204-
205200
<tr>
206201
<td>
207202
<a href={"app?appId=" + app.id}>{app.id}</a>
@@ -210,15 +205,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
210205
<td>
211206
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
212207
</td>
213-
{
214-
if (active) {
215-
<td>
216-
{app.coresGranted}
217-
</td>
218-
}
219-
}
220208
<td>
221-
{if (app.requestedCores == Int.MaxValue) "*" else app.requestedCores}
209+
{app.coresGranted}
222210
</td>
223211
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
224212
{Utils.megabytesToString(app.desc.memoryPerSlave)}
@@ -230,14 +218,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
230218
</tr>
231219
}
232220

233-
private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
234-
appRow(app, active = true)
235-
}
236-
237-
private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
238-
appRow(app, active = false)
239-
}
240-
241221
private def driverRow(driver: DriverInfo): Seq[Node] = {
242222
val killLink = if (parent.killEnabled &&
243223
(driver.state == DriverState.RUNNING ||

core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala

+5-3
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,11 @@ class TaskInfo(
8181

8282
def status: String = {
8383
if (running) {
84-
"RUNNING"
85-
} else if (gettingResult) {
86-
"GET RESULT"
84+
if (gettingResult) {
85+
"GET RESULT"
86+
} else {
87+
"RUNNING"
88+
}
8789
} else if (failed) {
8890
"FAILED"
8991
} else if (successful) {

core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala

+4
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ class FileShuffleBlockManager(conf: SparkConf)
112112
private val shuffleState = shuffleStates(shuffleId)
113113
private var fileGroup: ShuffleFileGroup = null
114114

115+
val openStartTime = System.nanoTime
115116
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
116117
fileGroup = getUnusedFileGroup()
117118
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
@@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf)
135136
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
136137
}
137138
}
139+
// Creating the file to write to and creating a disk writer both involve interacting with
140+
// the disk, so should be included in the shuffle write time.
141+
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
138142

139143
override def releaseWriters(success: Boolean) {
140144
if (consolidateShuffleFiles) {

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

+3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C](
6363
sorter.insertAll(records)
6464
}
6565

66+
// Don't bother including the time to open the merged output file in the shuffle write time,
67+
// because it just opens a single file, so is typically too fast to measure accurately
68+
// (see SPARK-3570).
6669
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
6770
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
6871
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

+18-5
Original file line numberDiff line numberDiff line change
@@ -535,9 +535,14 @@ private[spark] class BlockManager(
535535
/* We'll store the bytes in memory if the block's storage level includes
536536
* "memory serialized", or if it should be cached as objects in memory
537537
* but we only requested its serialized bytes. */
538-
val copyForMemory = ByteBuffer.allocate(bytes.limit)
539-
copyForMemory.put(bytes)
540-
memoryStore.putBytes(blockId, copyForMemory, level)
538+
memoryStore.putBytes(blockId, bytes.limit, () => {
539+
// https://issues.apache.org/jira/browse/SPARK-6076
540+
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
541+
// put it into MemoryStore, copyForMemory should not be created. That's why this
542+
// action is put into a `() => ByteBuffer` and created lazily.
543+
val copyForMemory = ByteBuffer.allocate(bytes.limit)
544+
copyForMemory.put(bytes)
545+
})
541546
bytes.rewind()
542547
}
543548
if (!asBlockResult) {
@@ -991,15 +996,23 @@ private[spark] class BlockManager(
991996
putIterator(blockId, Iterator(value), level, tellMaster)
992997
}
993998

999+
def dropFromMemory(
1000+
blockId: BlockId,
1001+
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
1002+
dropFromMemory(blockId, () => data)
1003+
}
1004+
9941005
/**
9951006
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
9961007
* store reaches its limit and needs to free up space.
9971008
*
1009+
* If `data` is not put on disk, it won't be created.
1010+
*
9981011
* Return the block status if the given block has been updated, else None.
9991012
*/
10001013
def dropFromMemory(
10011014
blockId: BlockId,
1002-
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
1015+
data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
10031016

10041017
logInfo(s"Dropping block $blockId from memory")
10051018
val info = blockInfo.get(blockId).orNull
@@ -1023,7 +1036,7 @@ private[spark] class BlockManager(
10231036
// Drop to disk, if storage level requires
10241037
if (level.useDisk && !diskStore.contains(blockId)) {
10251038
logInfo(s"Writing block $blockId to disk")
1026-
data match {
1039+
data() match {
10271040
case Left(elements) =>
10281041
diskStore.putArray(blockId, elements, level, returnValues = false)
10291042
case Right(bytes) =>

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

+37-6
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,26 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
9898
}
9999
}
100100

101+
/**
102+
* Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
103+
* put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
104+
*
105+
* The caller should guarantee that `size` is correct.
106+
*/
107+
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
108+
// Work on a duplicate - since the original input might be used elsewhere.
109+
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
110+
val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
111+
val data =
112+
if (putAttempt.success) {
113+
assert(bytes.limit == size)
114+
Right(bytes.duplicate())
115+
} else {
116+
null
117+
}
118+
PutResult(size, data, putAttempt.droppedBlocks)
119+
}
120+
101121
override def putArray(
102122
blockId: BlockId,
103123
values: Array[Any],
@@ -312,11 +332,22 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
312332
blockId.asRDDId.map(_.rddId)
313333
}
314334

335+
private def tryToPut(
336+
blockId: BlockId,
337+
value: Any,
338+
size: Long,
339+
deserialized: Boolean): ResultWithDroppedBlocks = {
340+
tryToPut(blockId, () => value, size, deserialized)
341+
}
342+
315343
/**
316344
* Try to put in a set of values, if we can free up enough space. The value should either be
317345
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
318346
* must also be passed by the caller.
319347
*
348+
* `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
349+
* created to avoid OOM since it may be a big ByteBuffer.
350+
*
320351
* Synchronize on `accountingLock` to ensure that all the put requests and its associated block
321352
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
322353
* blocks to free memory for one block, another thread may use up the freed space for
@@ -326,7 +357,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
326357
*/
327358
private def tryToPut(
328359
blockId: BlockId,
329-
value: Any,
360+
value: () => Any,
330361
size: Long,
331362
deserialized: Boolean): ResultWithDroppedBlocks = {
332363

@@ -345,7 +376,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
345376
droppedBlocks ++= freeSpaceResult.droppedBlocks
346377

347378
if (enoughFreeSpace) {
348-
val entry = new MemoryEntry(value, size, deserialized)
379+
val entry = new MemoryEntry(value(), size, deserialized)
349380
entries.synchronized {
350381
entries.put(blockId, entry)
351382
currentMemory += size
@@ -357,12 +388,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
357388
} else {
358389
// Tell the block manager that we couldn't put it in memory so that it can drop it to
359390
// disk if the block allows disk storage.
360-
val data = if (deserialized) {
361-
Left(value.asInstanceOf[Array[Any]])
391+
lazy val data = if (deserialized) {
392+
Left(value().asInstanceOf[Array[Any]])
362393
} else {
363-
Right(value.asInstanceOf[ByteBuffer].duplicate())
394+
Right(value().asInstanceOf[ByteBuffer].duplicate())
364395
}
365-
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
396+
val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
366397
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
367398
}
368399
// Release the unroll memory used because we no longer need the underlying Array

core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala

+9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.ui
1919

20+
import java.util.concurrent.Semaphore
21+
2022
import scala.util.Random
2123

2224
import org.apache.spark.{SparkConf, SparkContext}
@@ -88,6 +90,8 @@ private[spark] object UIWorkloadGenerator {
8890
("Job with delays", baseData.map(x => Thread.sleep(100)).count)
8991
)
9092

93+
val barrier = new Semaphore(-nJobSet * jobs.size + 1)
94+
9195
(1 to nJobSet).foreach { _ =>
9296
for ((desc, job) <- jobs) {
9397
new Thread {
@@ -99,12 +103,17 @@ private[spark] object UIWorkloadGenerator {
99103
} catch {
100104
case e: Exception =>
101105
println("Job Failed: " + desc)
106+
} finally {
107+
barrier.release()
102108
}
103109
}
104110
}.start
105111
Thread.sleep(INTER_JOB_WAIT_MS)
106112
}
107113
}
114+
115+
// Waiting for threads.
116+
barrier.acquire()
108117
sc.stop()
109118
}
110119
}

0 commit comments

Comments
 (0)