Skip to content

Commit 284771e

Browse files
advancedxyaarondav
authored andcommitted
[Spark 2557] fix LOCAL_N_REGEX in createTaskScheduler and make local-n and local-n-failures consistent
[SPARK-2557](https://issues.apache.org/jira/browse/SPARK-2557) Author: Ye Xianjin <advancedxy@gmail.com> Closes #1464 from advancedxy/SPARK-2557 and squashes the following commits: d844d67 [Ye Xianjin] add local-*-n-failures, bad-local-n, bad-local-n-failures test case 3bbc668 [Ye Xianjin] fix LOCAL_N_REGEX regular expression and make local_n_failures accept * as all cores on the computer
1 parent f1957e1 commit 284771e

File tree

2 files changed

+30
-3
lines changed

2 files changed

+30
-3
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1452,9 +1452,9 @@ object SparkContext extends Logging {
14521452
/** Creates a task scheduler based on a given master URL. Extracted for testing. */
14531453
private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
14541454
// Regular expression used for local[N] and local[*] master formats
1455-
val LOCAL_N_REGEX = """local\[([0-9\*]+)\]""".r
1455+
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
14561456
// Regular expression for local[N, maxRetries], used in tests with failing tasks
1457-
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
1457+
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
14581458
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
14591459
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
14601460
// Regular expression for connecting to Spark deploy clusters
@@ -1484,8 +1484,12 @@ object SparkContext extends Logging {
14841484
scheduler
14851485

14861486
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
1487+
def localCpuCount = Runtime.getRuntime.availableProcessors()
1488+
// local[*, M] means the number of cores on the computer with M failures
1489+
// local[N, M] means exactly N threads with M failures
1490+
val threadCount = if (threads == "*") localCpuCount else threads.toInt
14871491
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
1488-
val backend = new LocalBackend(scheduler, threads.toInt)
1492+
val backend = new LocalBackend(scheduler, threadCount)
14891493
scheduler.initialize(backend)
14901494
scheduler
14911495

core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ class SparkContextSchedulerCreationSuite
6868
}
6969
}
7070

71+
test("local-*-n-failures") {
72+
val sched = createTaskScheduler("local[* ,2]")
73+
assert(sched.maxTaskFailures === 2)
74+
sched.backend match {
75+
case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
76+
case _ => fail()
77+
}
78+
}
79+
7180
test("local-n-failures") {
7281
val sched = createTaskScheduler("local[4, 2]")
7382
assert(sched.maxTaskFailures === 2)
@@ -77,6 +86,20 @@ class SparkContextSchedulerCreationSuite
7786
}
7887
}
7988

89+
test("bad-local-n") {
90+
val e = intercept[SparkException] {
91+
createTaskScheduler("local[2*]")
92+
}
93+
assert(e.getMessage.contains("Could not parse Master URL"))
94+
}
95+
96+
test("bad-local-n-failures") {
97+
val e = intercept[SparkException] {
98+
createTaskScheduler("local[2*,4]")
99+
}
100+
assert(e.getMessage.contains("Could not parse Master URL"))
101+
}
102+
80103
test("local-default-parallelism") {
81104
val defaultParallelism = System.getProperty("spark.default.parallelism")
82105
System.setProperty("spark.default.parallelism", "16")

0 commit comments

Comments
 (0)