Skip to content

Commit cd000b0

Browse files
committed
Merge github.com:apache/spark into ui-refactor
2 parents 7d57444 + 47ebea5 commit cd000b0

File tree

111 files changed

+5198
-1204
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

111 files changed

+5198
-1204
lines changed

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
151151
<version>3.2.6</version>
152152
<!-- see also exclusion for lift-json; this is necessary since it depends on
153-
scala-library and scalap 2.10.0, but we use 2.10.3, and only override
153+
scala-library and scalap 2.10.0, but we use 2.10.4, and only override
154154
scala-library -->
155155
<exclusions>
156156
<exclusion>

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class DAGScheduler(
8484
private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
8585
private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
8686
private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
87-
private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
87+
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
8888
private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
8989
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
9090

@@ -536,7 +536,7 @@ class DAGScheduler(
536536
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
537537
runLocally(job)
538538
} else {
539-
stageIdToActiveJob(jobId) = job
539+
jobIdToActiveJob(jobId) = job
540540
activeJobs += job
541541
resultStageToJob(finalStage) = job
542542
listenerBus.post(
@@ -559,7 +559,7 @@ class DAGScheduler(
559559
// Cancel all running jobs.
560560
runningStages.map(_.jobId).foreach(handleJobCancellation)
561561
activeJobs.clear() // These should already be empty by this point,
562-
stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
562+
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
563563

564564
case ExecutorAdded(execId, host) =>
565565
handleExecutorAdded(execId, host)
@@ -569,7 +569,6 @@ class DAGScheduler(
569569

570570
case BeginEvent(task, taskInfo) =>
571571
for (
572-
job <- stageIdToActiveJob.get(task.stageId);
573572
stage <- stageIdToStage.get(task.stageId);
574573
stageInfo <- stageToInfos.get(stage)
575574
) {
@@ -697,7 +696,7 @@ class DAGScheduler(
697696
private def activeJobForStage(stage: Stage): Option[Int] = {
698697
if (stageIdToJobIds.contains(stage.id)) {
699698
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
700-
jobsThatUseStage.find(stageIdToActiveJob.contains)
699+
jobsThatUseStage.find(jobIdToActiveJob.contains)
701700
} else {
702701
None
703702
}
@@ -750,8 +749,8 @@ class DAGScheduler(
750749
}
751750
}
752751

753-
val properties = if (stageIdToActiveJob.contains(jobId)) {
754-
stageIdToActiveJob(stage.jobId).properties
752+
val properties = if (jobIdToActiveJob.contains(jobId)) {
753+
jobIdToActiveJob(stage.jobId).properties
755754
} else {
756755
// this stage will be assigned to "default" pool
757756
null
@@ -827,7 +826,7 @@ class DAGScheduler(
827826
job.numFinished += 1
828827
// If the whole job has finished, remove it
829828
if (job.numFinished == job.numPartitions) {
830-
stageIdToActiveJob -= stage.jobId
829+
jobIdToActiveJob -= stage.jobId
831830
activeJobs -= job
832831
resultStageToJob -= stage
833832
markStageAsFinished(stage)
@@ -986,11 +985,11 @@ class DAGScheduler(
986985
val independentStages = removeJobAndIndependentStages(jobId)
987986
independentStages.foreach(taskScheduler.cancelTasks)
988987
val error = new SparkException("Job %d cancelled".format(jobId))
989-
val job = stageIdToActiveJob(jobId)
988+
val job = jobIdToActiveJob(jobId)
990989
job.listener.jobFailed(error)
991990
jobIdToStageIds -= jobId
992991
activeJobs -= job
993-
stageIdToActiveJob -= jobId
992+
jobIdToActiveJob -= jobId
994993
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
995994
}
996995
}
@@ -1011,7 +1010,7 @@ class DAGScheduler(
10111010
val error = new SparkException("Job aborted: " + reason)
10121011
job.listener.jobFailed(error)
10131012
jobIdToStageIdsRemove(job.jobId)
1014-
stageIdToActiveJob -= resultStage.jobId
1013+
jobIdToActiveJob -= resultStage.jobId
10151014
activeJobs -= job
10161015
resultStageToJob -= resultStage
10171016
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 2 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ private[spark] object JsonProtocol {
195195
taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
196196
val updatedBlocks = taskMetrics.updatedBlocks.map { blocks =>
197197
JArray(blocks.toList.map { case (id, status) =>
198-
("Block ID" -> blockIdToJson(id)) ~
198+
("Block ID" -> id.toString) ~
199199
("Status" -> blockStatusToJson(status))
200200
})
201201
}.getOrElse(JNothing)
@@ -284,35 +284,6 @@ private[spark] object JsonProtocol {
284284
("Replication" -> storageLevel.replication)
285285
}
286286

287-
def blockIdToJson(blockId: BlockId): JValue = {
288-
val blockType = Utils.getFormattedClassName(blockId)
289-
val json: JObject = blockId match {
290-
case rddBlockId: RDDBlockId =>
291-
("RDD ID" -> rddBlockId.rddId) ~
292-
("Split Index" -> rddBlockId.splitIndex)
293-
case shuffleBlockId: ShuffleBlockId =>
294-
("Shuffle ID" -> shuffleBlockId.shuffleId) ~
295-
("Map ID" -> shuffleBlockId.mapId) ~
296-
("Reduce ID" -> shuffleBlockId.reduceId)
297-
case broadcastBlockId: BroadcastBlockId =>
298-
"Broadcast ID" -> broadcastBlockId.broadcastId
299-
case broadcastHelperBlockId: BroadcastHelperBlockId =>
300-
("Broadcast Block ID" -> blockIdToJson(broadcastHelperBlockId.broadcastId)) ~
301-
("Helper Type" -> broadcastHelperBlockId.hType)
302-
case taskResultBlockId: TaskResultBlockId =>
303-
"Task ID" -> taskResultBlockId.taskId
304-
case streamBlockId: StreamBlockId =>
305-
("Stream ID" -> streamBlockId.streamId) ~
306-
("Unique ID" -> streamBlockId.uniqueId)
307-
case tempBlockId: TempBlockId =>
308-
val uuid = UUIDToJson(tempBlockId.id)
309-
"Temp ID" -> uuid
310-
case testBlockId: TestBlockId =>
311-
"Test ID" -> testBlockId.id
312-
}
313-
("Type" -> blockType) ~ json
314-
}
315-
316287
def blockStatusToJson(blockStatus: BlockStatus): JValue = {
317288
val storageLevel = storageLevelToJson(blockStatus.storageLevel)
318289
("Storage Level" -> storageLevel) ~
@@ -513,7 +484,7 @@ private[spark] object JsonProtocol {
513484
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
514485
metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value =>
515486
value.extract[List[JValue]].map { block =>
516-
val id = blockIdFromJson(block \ "Block ID")
487+
val id = BlockId((block \ "Block ID").extract[String])
517488
val status = blockStatusFromJson(block \ "Status")
518489
(id, status)
519490
}
@@ -616,50 +587,6 @@ private[spark] object JsonProtocol {
616587
StorageLevel(useDisk, useMemory, deserialized, replication)
617588
}
618589

619-
def blockIdFromJson(json: JValue): BlockId = {
620-
val rddBlockId = Utils.getFormattedClassName(RDDBlockId)
621-
val shuffleBlockId = Utils.getFormattedClassName(ShuffleBlockId)
622-
val broadcastBlockId = Utils.getFormattedClassName(BroadcastBlockId)
623-
val broadcastHelperBlockId = Utils.getFormattedClassName(BroadcastHelperBlockId)
624-
val taskResultBlockId = Utils.getFormattedClassName(TaskResultBlockId)
625-
val streamBlockId = Utils.getFormattedClassName(StreamBlockId)
626-
val tempBlockId = Utils.getFormattedClassName(TempBlockId)
627-
val testBlockId = Utils.getFormattedClassName(TestBlockId)
628-
629-
(json \ "Type").extract[String] match {
630-
case `rddBlockId` =>
631-
val rddId = (json \ "RDD ID").extract[Int]
632-
val splitIndex = (json \ "Split Index").extract[Int]
633-
new RDDBlockId(rddId, splitIndex)
634-
case `shuffleBlockId` =>
635-
val shuffleId = (json \ "Shuffle ID").extract[Int]
636-
val mapId = (json \ "Map ID").extract[Int]
637-
val reduceId = (json \ "Reduce ID").extract[Int]
638-
new ShuffleBlockId(shuffleId, mapId, reduceId)
639-
case `broadcastBlockId` =>
640-
val broadcastId = (json \ "Broadcast ID").extract[Long]
641-
new BroadcastBlockId(broadcastId)
642-
case `broadcastHelperBlockId` =>
643-
val broadcastBlockId =
644-
blockIdFromJson(json \ "Broadcast Block ID").asInstanceOf[BroadcastBlockId]
645-
val hType = (json \ "Helper Type").extract[String]
646-
new BroadcastHelperBlockId(broadcastBlockId, hType)
647-
case `taskResultBlockId` =>
648-
val taskId = (json \ "Task ID").extract[Long]
649-
new TaskResultBlockId(taskId)
650-
case `streamBlockId` =>
651-
val streamId = (json \ "Stream ID").extract[Int]
652-
val uniqueId = (json \ "Unique ID").extract[Long]
653-
new StreamBlockId(streamId, uniqueId)
654-
case `tempBlockId` =>
655-
val tempId = UUIDFromJson(json \ "Temp ID")
656-
new TempBlockId(tempId)
657-
case `testBlockId` =>
658-
val testId = (json \ "Test ID").extract[String]
659-
new TestBlockId(testId)
660-
}
661-
}
662-
663590
def blockStatusFromJson(json: JValue): BlockStatus = {
664591
val storageLevel = storageLevelFromJson(json \ "Storage Level")
665592
val memorySize = (json \ "Memory Size").extract[Long]

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
428428
assert(scheduler.pendingTasks.isEmpty)
429429
assert(scheduler.activeJobs.isEmpty)
430430
assert(scheduler.failedStages.isEmpty)
431-
assert(scheduler.stageIdToActiveJob.isEmpty)
431+
assert(scheduler.jobIdToActiveJob.isEmpty)
432432
assert(scheduler.jobIdToStageIds.isEmpty)
433433
assert(scheduler.stageIdToJobIds.isEmpty)
434434
assert(scheduler.stageIdToStage.isEmpty)

0 commit comments

Comments
 (0)