Skip to content

Commit 78b9c60

Browse files
author
qqsun8819
committed
1 SparkContext MASTER=local pattern use default cores instead of 1 to construct LocalBackEnd , for use of spark-shell and cores specified in cmd line 2 some test case change from local to local[1]. 3 SparkContextSchedulerCreationSuite test spark.cores.max config in local pattern
1 parent 6ae1ee8 commit 78b9c60

File tree

4 files changed

+19
-9
lines changed

4 files changed

+19
-9
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1204,7 +1204,7 @@ object SparkContext extends Logging {
12041204
master match {
12051205
case "local" =>
12061206
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
1207-
val backend = new LocalBackend(scheduler, 1)
1207+
val backend = new LocalBackend(scheduler)
12081208
scheduler.initialize(backend)
12091209
scheduler
12101210

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,5 @@ private[spark] object LocalBackend {
127127
retCores
128128
}
129129
}
130-
131-
132-
133130
}
134131
}

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
3333
class FileSuite extends FunSuite with LocalSparkContext {
3434

3535
test("text files") {
36-
sc = new SparkContext("local", "test")
36+
sc = new SparkContext("local[1]", "test")
3737
val tempDir = Files.createTempDir()
3838
val outputDir = new File(tempDir, "output").getAbsolutePath
3939
val nums = sc.makeRDD(1 to 4)
@@ -175,7 +175,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
175175

176176
test("write SequenceFile using new Hadoop API") {
177177
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
178-
sc = new SparkContext("local", "test")
178+
sc = new SparkContext("local[1]", "test")
179179
val tempDir = Files.createTempDir()
180180
val outputDir = new File(tempDir, "output").getAbsolutePath
181181
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))

core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ import org.apache.spark.scheduler.local.LocalBackend
2727
class SparkContextSchedulerCreationSuite
2828
extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {
2929

30-
def createTaskScheduler(master: String): TaskSchedulerImpl = {
30+
def createTaskScheduler(master: String, conf: SparkConf = new SparkConf()): TaskSchedulerImpl = {
3131
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
3232
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
33-
sc = new SparkContext("local", "test")
33+
sc = new SparkContext("local", "test", conf)
3434
val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
3535
val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test")
3636
sched.asInstanceOf[TaskSchedulerImpl]
@@ -44,13 +44,26 @@ class SparkContextSchedulerCreationSuite
4444
}
4545

4646
test("local") {
47-
val sched = createTaskScheduler("local")
47+
var conf = new SparkConf()
48+
conf.set("spark.cores.max", "1")
49+
val sched = createTaskScheduler("local", conf)
4850
sched.backend match {
4951
case s: LocalBackend => assert(s.totalCores === 1)
5052
case _ => fail()
5153
}
5254
}
5355

56+
test("local-cores-exceed") {
57+
val cores = Runtime.getRuntime.availableProcessors() + 1
58+
var conf = new SparkConf()
59+
conf.set("spark.cores.max", cores.toString)
60+
val sched = createTaskScheduler("local", conf)
61+
sched.backend match {
62+
case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
63+
case _ => fail()
64+
}
65+
}
66+
5467
test("local-n") {
5568
val sched = createTaskScheduler("local[5]")
5669
assert(sched.maxTaskFailures === 1)

0 commit comments

Comments
 (0)