Skip to content

Commit 1678931

Browse files
qqsun8819aarondav
qqsun8819
authored andcommitted
SPARK-1099:Spark's local mode should probably respect spark.cores.max by default
This is for JIRA:https://spark-project.atlassian.net/browse/SPARK-1099 And this is what I do in this patch (also commented in the JIRA) @aarondav This is really a behavioral change, so I do this with great caution, and welcome any review advice: 1 I change the "MASTER=local" pattern of create LocalBackEnd . In the past, we passed 1 core to it . now it use a default cores The reason here is that when someone use spark-shell to start local mode , Repl will use this "MASTER=local" pattern as default. So if one also specify cores in the spark-shell command line, it will all go in here. So here pass 1 core is not suitalbe reponding to our change here. 2 In the LocalBackEnd , the "totalCores" variable are fetched following a different rule(in the past it just take in a userd passed cores, like 1 in "MASTER=local" pattern, 2 in "MASTER=local[2]" pattern" rules: a The second argument of LocalBackEnd 's constructor indicating cores have a default value which is Int.MaxValue. If user didn't pass it , its first default value is Int.MaxValue b In getMaxCores, we first compare the former value to Int.MaxValue. if it's not equal, we think that user has passed their desired value, so just use it c. If b is not satified, we then get cores from spark.cores.max, and we get real logical cores from Runtime. And if cores specified by spark.cores.max is bigger than logical cores, we use logical cores, otherwise we use spark.cores.max 3 In SparkContextSchedulerCreationSuite 's test("local") case, assertion is modified from 1 to logical cores, because "MASTER=local" pattern use default vaules. Author: qqsun8819 <jin.oyj@alibaba-inc.com> Closes #110 from qqsun8819/local-cores and squashes the following commits: 731aefa [qqsun8819] 1 LocalBackend not change 2 In SparkContext do some process to the cores and pass it to original LocalBackend constructor 78b9c60 [qqsun8819] 1 SparkContext MASTER=local pattern use default cores instead of 1 to construct LocalBackEnd , for use of spark-shell and cores specified in cmd line 2 some test case change from local to local[1]. 3 SparkContextSchedulerCreationSuite test spark.cores.max config in local pattern 6ae1ee8 [qqsun8819] Add a static function in LocalBackEnd to let it use spark.cores.max specified cores when no cores are passed to it
1 parent 67fa71c commit 1678931

File tree

3 files changed

+22
-6
lines changed

3 files changed

+22
-6
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1262,7 +1262,10 @@ object SparkContext extends Logging {
12621262
master match {
12631263
case "local" =>
12641264
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
1265-
val backend = new LocalBackend(scheduler, 1)
1265+
// Use user specified in config, up to all available cores
1266+
val realCores = Runtime.getRuntime.availableProcessors()
1267+
val toUseCores = math.min(sc.conf.getInt("spark.cores.max", realCores), realCores)
1268+
val backend = new LocalBackend(scheduler, toUseCores)
12661269
scheduler.initialize(backend)
12671270
scheduler
12681271

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.SparkContext._
3434
class FileSuite extends FunSuite with LocalSparkContext {
3535

3636
test("text files") {
37-
sc = new SparkContext("local", "test")
37+
sc = new SparkContext("local[1]", "test")
3838
val tempDir = Files.createTempDir()
3939
val outputDir = new File(tempDir, "output").getAbsolutePath
4040
val nums = sc.makeRDD(1 to 4)
@@ -176,7 +176,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
176176

177177
test("write SequenceFile using new Hadoop API") {
178178
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
179-
sc = new SparkContext("local", "test")
179+
sc = new SparkContext("local[1]", "test")
180180
val tempDir = Files.createTempDir()
181181
val outputDir = new File(tempDir, "output").getAbsolutePath
182182
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))

core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ import org.apache.spark.scheduler.local.LocalBackend
2727
class SparkContextSchedulerCreationSuite
2828
extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {
2929

30-
def createTaskScheduler(master: String): TaskSchedulerImpl = {
30+
def createTaskScheduler(master: String, conf: SparkConf = new SparkConf()): TaskSchedulerImpl = {
3131
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
3232
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
33-
sc = new SparkContext("local", "test")
33+
sc = new SparkContext("local", "test", conf)
3434
val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
3535
val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
3636
sched.asInstanceOf[TaskSchedulerImpl]
@@ -44,13 +44,26 @@ class SparkContextSchedulerCreationSuite
4444
}
4545

4646
test("local") {
47-
val sched = createTaskScheduler("local")
47+
var conf = new SparkConf()
48+
conf.set("spark.cores.max", "1")
49+
val sched = createTaskScheduler("local", conf)
4850
sched.backend match {
4951
case s: LocalBackend => assert(s.totalCores === 1)
5052
case _ => fail()
5153
}
5254
}
5355

56+
test("local-cores-exceed") {
57+
val cores = Runtime.getRuntime.availableProcessors() + 1
58+
var conf = new SparkConf()
59+
conf.set("spark.cores.max", cores.toString)
60+
val sched = createTaskScheduler("local", conf)
61+
sched.backend match {
62+
case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
63+
case _ => fail()
64+
}
65+
}
66+
5467
test("local-n") {
5568
val sched = createTaskScheduler("local[5]")
5669
assert(sched.maxTaskFailures === 1)

0 commit comments

Comments
 (0)