Skip to content

Commit fc6a3e2

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-4924
Conflicts: bin/spark-submit bin/spark-submit2.cmd
2 parents f26556b + 8782eb9 commit fc6a3e2

File tree

21 files changed

+343
-82
lines changed

21 files changed

+343
-82
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
/**
23+
* Exception thrown when a task cannot be serialized.
24+
*/
25+
private[spark] class TaskNotSerializableException(error: Throwable) extends Exception(error)

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
392392
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
393393
| --num-executors NUM Number of executors to launch (Default: 2).
394394
| --archives ARCHIVES Comma separated list of archives to be extracted into the
395-
| working directory of each executor.""".stripMargin
395+
| working directory of each executor.
396+
""".stripMargin
396397
)
397398
SparkSubmit.exitFn()
398399
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -866,26 +866,6 @@ class DAGScheduler(
866866
}
867867

868868
if (tasks.size > 0) {
869-
// Preemptively serialize a task to make sure it can be serialized. We are catching this
870-
// exception here because it would be fairly hard to catch the non-serializable exception
871-
// down the road, where we have several different implementations for local scheduler and
872-
// cluster schedulers.
873-
//
874-
// We've already serialized RDDs and closures in taskBinary, but here we check for all other
875-
// objects such as Partition.
876-
try {
877-
closureSerializer.serialize(tasks.head)
878-
} catch {
879-
case e: NotSerializableException =>
880-
abortStage(stage, "Task not serializable: " + e.toString)
881-
runningStages -= stage
882-
return
883-
case NonFatal(e) => // Other exceptions, such as IllegalArgumentException from Kryo.
884-
abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
885-
runningStages -= stage
886-
return
887-
}
888-
889869
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
890870
stage.pendingTasks ++= tasks
891871
logDebug("New pending tasks: " + stage.pendingTasks)

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import scala.util.Random
3131
import org.apache.spark._
3232
import org.apache.spark.TaskState.TaskState
3333
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
34+
import org.apache.spark.scheduler.TaskLocality.TaskLocality
3435
import org.apache.spark.util.Utils
3536
import org.apache.spark.executor.TaskMetrics
3637
import org.apache.spark.storage.BlockManagerId
@@ -209,6 +210,40 @@ private[spark] class TaskSchedulerImpl(
209210
.format(manager.taskSet.id, manager.parent.name))
210211
}
211212

213+
private def resourceOfferSingleTaskSet(
214+
taskSet: TaskSetManager,
215+
maxLocality: TaskLocality,
216+
shuffledOffers: Seq[WorkerOffer],
217+
availableCpus: Array[Int],
218+
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
219+
var launchedTask = false
220+
for (i <- 0 until shuffledOffers.size) {
221+
val execId = shuffledOffers(i).executorId
222+
val host = shuffledOffers(i).host
223+
if (availableCpus(i) >= CPUS_PER_TASK) {
224+
try {
225+
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
226+
tasks(i) += task
227+
val tid = task.taskId
228+
taskIdToTaskSetId(tid) = taskSet.taskSet.id
229+
taskIdToExecutorId(tid) = execId
230+
executorsByHost(host) += execId
231+
availableCpus(i) -= CPUS_PER_TASK
232+
assert(availableCpus(i) >= 0)
233+
launchedTask = true
234+
}
235+
} catch {
236+
case e: TaskNotSerializableException =>
237+
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
238+
// Do not offer resources for this task, but don't throw an error to allow other
239+
// task sets to be submitted.
240+
return launchedTask
241+
}
242+
}
243+
}
244+
return launchedTask
245+
}
246+
212247
/**
213248
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
214249
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
@@ -251,23 +286,8 @@ private[spark] class TaskSchedulerImpl(
251286
var launchedTask = false
252287
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
253288
do {
254-
launchedTask = false
255-
for (i <- 0 until shuffledOffers.size) {
256-
val execId = shuffledOffers(i).executorId
257-
val host = shuffledOffers(i).host
258-
if (availableCpus(i) >= CPUS_PER_TASK) {
259-
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
260-
tasks(i) += task
261-
val tid = task.taskId
262-
taskIdToTaskSetId(tid) = taskSet.taskSet.id
263-
taskIdToExecutorId(tid) = execId
264-
executorsByHost(host) += execId
265-
availableCpus(i) -= CPUS_PER_TASK
266-
assert(availableCpus(i) >= 0)
267-
launchedTask = true
268-
}
269-
}
270-
}
289+
launchedTask = resourceOfferSingleTaskSet(
290+
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
271291
} while (launchedTask)
272292
}
273293

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.io.NotSerializableException
21+
import java.nio.ByteBuffer
2122
import java.util.Arrays
2223

2324
import scala.collection.mutable.ArrayBuffer
2425
import scala.collection.mutable.HashMap
2526
import scala.collection.mutable.HashSet
2627
import scala.math.{min, max}
28+
import scala.util.control.NonFatal
2729

2830
import org.apache.spark._
2931
import org.apache.spark.executor.TaskMetrics
@@ -417,6 +419,7 @@ private[spark] class TaskSetManager(
417419
* @param host the host Id of the offered resource
418420
* @param maxLocality the maximum locality we want to schedule the tasks at
419421
*/
422+
@throws[TaskNotSerializableException]
420423
def resourceOffer(
421424
execId: String,
422425
host: String,
@@ -456,10 +459,17 @@ private[spark] class TaskSetManager(
456459
}
457460
// Serialize and return the task
458461
val startTime = clock.getTime()
459-
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
460-
// we assume the task can be serialized without exceptions.
461-
val serializedTask = Task.serializeWithDependencies(
462-
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
462+
val serializedTask: ByteBuffer = try {
463+
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
464+
} catch {
465+
// If the task cannot be serialized, then there's no point to re-attempt the task,
466+
// as it will always fail. So just abort the whole task-set.
467+
case NonFatal(e) =>
468+
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
469+
logError(msg, e)
470+
abort(s"$msg Exception during serialization: $e")
471+
throw new TaskNotSerializableException(e)
472+
}
463473
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
464474
!emittedTaskSizeWarning) {
465475
emittedTaskSizeWarning = true

core/src/test/scala/org/apache/spark/SharedSparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
3030
var conf = new SparkConf(false)
3131

3232
override def beforeAll() {
33-
_sc = new SparkContext("local", "test", conf)
33+
_sc = new SparkContext("local[4]", "test", conf)
3434
super.beforeAll()
3535
}
3636

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.spark.rdd
1919

20+
import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
21+
22+
import com.esotericsoftware.kryo.KryoException
23+
2024
import scala.collection.mutable.{ArrayBuffer, HashMap}
2125
import scala.collection.JavaConverters._
2226
import scala.reflect.ClassTag
@@ -887,6 +891,23 @@ class RDDSuite extends FunSuite with SharedSparkContext {
887891
assert(ancestors6.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 3)
888892
}
889893

894+
test("task serialization exception should not hang scheduler") {
895+
class BadSerializable extends Serializable {
896+
@throws(classOf[IOException])
897+
private def writeObject(out: ObjectOutputStream): Unit = throw new KryoException("Bad serialization")
898+
899+
@throws(classOf[IOException])
900+
private def readObject(in: ObjectInputStream): Unit = {}
901+
}
902+
// Note that in the original bug, SPARK-4349, that this verifies, the job would only hang if there were
903+
// more threads in the Spark Context than there were number of objects in this sequence.
904+
intercept[Throwable] {
905+
sc.parallelize(Seq(new BadSerializable, new BadSerializable)).collect
906+
}
907+
// Check that the context has not crashed
908+
sc.parallelize(1 to 100).map(x => x*2).collect
909+
}
910+
890911
/** A contrived RDD that allows the manual addition of dependencies after creation. */
891912
private class CyclicalDependencyRDD[T: ClassTag] extends RDD[T](sc, Nil) {
892913
private val mutableDependencies: ArrayBuffer[Dependency[_]] = ArrayBuffer.empty
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
21+
22+
import org.apache.spark.TaskContext
23+
24+
/**
25+
* A Task implementation that fails to serialize.
26+
*/
27+
private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int) extends Task[Array[Byte]](stageId, 0) {
28+
override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte]
29+
override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
30+
31+
@throws(classOf[IOException])
32+
private def writeObject(out: ObjectOutputStream): Unit = {
33+
if (stageId == 0) {
34+
throw new IllegalStateException("Cannot serialize")
35+
}
36+
}
37+
38+
@throws(classOf[IOException])
39+
private def readObject(in: ObjectInputStream): Unit = {}
40+
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,34 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
100100
assert(1 === taskDescriptions.length)
101101
assert("executor0" === taskDescriptions(0).executorId)
102102
}
103+
104+
test("Scheduler does not crash when tasks are not serializable") {
105+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
106+
val taskCpus = 2
107+
108+
sc.conf.set("spark.task.cpus", taskCpus.toString)
109+
val taskScheduler = new TaskSchedulerImpl(sc)
110+
taskScheduler.initialize(new FakeSchedulerBackend)
111+
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
112+
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
113+
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
114+
override def executorAdded(execId: String, host: String) {}
115+
}
116+
val numFreeCores = 1
117+
taskScheduler.setDAGScheduler(dagScheduler)
118+
var taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
119+
val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
120+
new WorkerOffer("executor1", "host1", numFreeCores))
121+
taskScheduler.submitTasks(taskSet)
122+
var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
123+
assert(0 === taskDescriptions.length)
124+
125+
// Now check that we can still submit tasks
126+
// Even if one of the tasks has not-serializable tasks, the other task set should still be processed without error
127+
taskScheduler.submitTasks(taskSet)
128+
taskScheduler.submitTasks(FakeTask.createTaskSet(1))
129+
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
130+
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
131+
}
132+
103133
}

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
2021
import java.util.Random
2122

2223
import scala.collection.mutable.ArrayBuffer
@@ -563,6 +564,19 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
563564
assert(manager.emittedTaskSizeWarning)
564565
}
565566

567+
test("Not serializable exception thrown if the task cannot be serialized") {
568+
sc = new SparkContext("local", "test")
569+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
570+
571+
val taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
572+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
573+
574+
intercept[TaskNotSerializableException] {
575+
manager.resourceOffer("exec1", "host1", ANY)
576+
}
577+
assert(manager.isZombie)
578+
}
579+
566580
test("abort the job if total size of results is too large") {
567581
val conf = new SparkConf().set("spark.driver.maxResultSize", "2m")
568582
sc = new SparkContext("local", "test", conf)

docs/_config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ SPARK_VERSION: 1.3.0-SNAPSHOT
1717
SPARK_VERSION_SHORT: 1.3.0
1818
SCALA_BINARY_VERSION: "2.10"
1919
SCALA_VERSION: "2.10.4"
20-
MESOS_VERSION: 0.18.1
20+
MESOS_VERSION: 0.21.0
2121
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
2222
SPARK_GITHUB_URL: https://github.com/apache/spark

docs/running-on-yarn.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
2121

2222
<table class="table">
2323
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
24+
<tr>
25+
<td><code>spark.yarn.am.memory</code></td>
26+
<td>512m</td>
27+
<td>
28+
Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. <code>512m</code>, <code>2g</code>).
29+
In cluster mode, use <code>spark.driver.memory</code> instead.
30+
</td>
31+
</tr>
2432
<tr>
2533
<td><code>spark.yarn.am.waitTime</code></td>
2634
<td>100000</td>
@@ -90,7 +98,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
9098
<td><code>spark.yarn.driver.memoryOverhead</code></td>
9199
<td>driverMemory * 0.07, with minimum of 384 </td>
92100
<td>
93-
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
101+
The amount of off heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
102+
</td>
103+
</tr>
104+
<tr>
105+
<td><code>spark.yarn.am.memoryOverhead</code></td>
106+
<td>AM memory * 0.07, with minimum of 384 </td>
107+
<td>
108+
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the Application Master in client mode.
94109
</td>
95110
</tr>
96111
<tr>
@@ -145,7 +160,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
145160
<td><code>spark.yarn.am.extraJavaOptions</code></td>
146161
<td>(none)</td>
147162
<td>
148-
A string of extra JVM options to pass to the Yarn ApplicationMaster in client mode.
163+
A string of extra JVM options to pass to the YARN Application Master in client mode.
149164
In cluster mode, use spark.driver.extraJavaOptions instead.
150165
</td>
151166
</tr>

0 commit comments

Comments
 (0)