Skip to content

SPARK-2083 Add support for spark.local.maxFailures configuration property #1465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1463,12 +1463,13 @@ object SparkContext extends Logging {
// Regular expression for connection to Simr cluster
val SIMR_REGEX = """simr://(.*)""".r

// When running locally, don't try to re-execute tasks on failure.
// When running locally, by default don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1

master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val maxTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES)
val scheduler = new TaskSchedulerImpl(sc, maxTaskFailures, isLocal = true)
val backend = new LocalBackend(scheduler, 1)
scheduler.initialize(backend)
scheduler
Expand All @@ -1477,7 +1478,8 @@ object SparkContext extends Logging {
def localCpuCount = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val maxTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES)
val scheduler = new TaskSchedulerImpl(sc, maxTaskFailures, isLocal = true)
val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ class SparkContextSchedulerCreationSuite
}
}

test("local-conf-failures") {
val defaultLocalMaxFailures = System.getProperty("spark.local.maxFailures")
System.setProperty("spark.local.maxFailures", "10")
val sched = createTaskScheduler("local")
assert(sched.maxTaskFailures === 10)
sched.backend match {
case s: LocalBackend => assert(s.totalCores === 1)
case _ => fail()
}

Option(defaultLocalMaxFailures) match {
case Some(v) => System.setProperty("spark.local.maxFailures", v)
case _ => System.clearProperty("spark.local.maxFailures")
}
}

test("local-n") {
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 1)
Expand All @@ -68,6 +84,22 @@ class SparkContextSchedulerCreationSuite
}
}

test("local-n-conf-failures") {
val defaultLocalMaxFailures = System.getProperty("spark.local.maxFailures")
System.setProperty("spark.local.maxFailures", "10")
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 10)
sched.backend match {
case s: LocalBackend => assert(s.totalCores === 5)
case _ => fail()
}

Option(defaultLocalMaxFailures) match {
case Some(v) => System.setProperty("spark.local.maxFailures", v)
case _ => System.clearProperty("spark.local.maxFailures")
}
}

test("local-n-failures") {
val sched = createTaskScheduler("local[4, 2]")
assert(sched.maxTaskFailures === 2)
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,15 @@ Apart from these, the following properties are also available, and may be useful
<td>
Number of individual task failures before giving up on the job.
Should be greater than or equal to 1. Number of allowed retries = this value - 1.
Does not apply to running Spark locally.
</td>
</tr>
<tr>
<td><code>spark.local.maxFailures</code></td>
<td>1</td>
<td>
Number of individual task failures before giving up on the job, when running Spark locally.
Should be greater than or equal to 1. Number of allowed retries = this value - 1.
</td>
</tr>
<tr>
Expand Down