Skip to content

Commit 86efa45

Browse files
ueshinHyukjinKwon
authored andcommitted
[SPARK-32160][CORE][PYSPARK] Disallow to create SparkContext in executors
### What changes were proposed in this pull request? This PR proposes to disallow to create `SparkContext` in executors, e.g., in UDFs. ### Why are the changes needed? Currently executors can create SparkContext, but shouldn't be able to create it. ```scala sc.range(0, 1).foreach { _ => new SparkContext(new SparkConf().setAppName("test").setMaster("local")) } ``` ### Does this PR introduce _any_ user-facing change? Yes, users won't be able to create `SparkContext` in executors. ### How was this patch tested? Addes tests. Closes #28986 from ueshin/issues/SPARK-32160/disallow_spark_context_in_executors. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit cfecc20) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
1 parent f9d53a6 commit 86efa45

File tree

6 files changed

+67
-20
lines changed

6 files changed

+67
-20
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ class SparkContext(config: SparkConf) extends Logging {
8282
// The call site where this SparkContext was constructed.
8383
private val creationSite: CallSite = Utils.getCallSite()
8484

85+
// In order to prevent SparkContext from being created in executors.
86+
SparkContext.assertOnDriver()
87+
8588
// In order to prevent multiple SparkContexts from being active at the same time, mark this
8689
// context as having started construction.
8790
// NOTE: this must be placed at the beginning of the SparkContext constructor.
@@ -2539,6 +2542,19 @@ object SparkContext extends Logging {
25392542
}
25402543
}
25412544

2545+
/**
2546+
* Called to ensure that SparkContext is created or accessed only on the Driver.
2547+
*
2548+
* Throws an exception if a SparkContext is about to be created in executors.
2549+
*/
2550+
private def assertOnDriver(): Unit = {
2551+
if (TaskContext.get != null) {
2552+
// we're accessing it during task execution, fail.
2553+
throw new IllegalStateException(
2554+
"SparkContext should only be created and accessed on the driver.")
2555+
}
2556+
}
2557+
25422558
/**
25432559
* This function may be used to get or instantiate a SparkContext and register it as a
25442560
* singleton object. Because we can only have one active SparkContext per JVM,

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
950950
}
951951
}
952952
}
953+
954+
test("SPARK-32160: Disallow to create SparkContext in executors") {
955+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
956+
957+
val error = intercept[SparkException] {
958+
sc.range(0, 1).foreach { _ =>
959+
new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
960+
}
961+
}.getMessage()
962+
963+
assert(error.contains("SparkContext should only be created and accessed on the driver."))
964+
}
953965
}
954966

955967
object SparkContextSuite {

python/pyspark/context.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from pyspark.storagelevel import StorageLevel
3939
from pyspark.resource import ResourceInformation
4040
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
41+
from pyspark.taskcontext import TaskContext
4142
from pyspark.traceback_utils import CallSite, first_spark_call
4243
from pyspark.status import StatusTracker
4344
from pyspark.profiler import ProfilerCollector, BasicProfiler
@@ -118,6 +119,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
118119
...
119120
ValueError:...
120121
"""
122+
# In order to prevent SparkContext from being created in executors.
123+
SparkContext._assert_on_driver()
124+
121125
self._callsite = first_spark_call() or CallSite(None, None, None)
122126
if gateway is not None and gateway.gateway_parameters.auth_token is None:
123127
raise ValueError(
@@ -1145,6 +1149,16 @@ def resources(self):
11451149
resources[name] = ResourceInformation(name, addrs)
11461150
return resources
11471151

1152+
@staticmethod
1153+
def _assert_on_driver():
1154+
"""
1155+
Called to ensure that SparkContext is created only on the Driver.
1156+
1157+
Throws an exception if a SparkContext is about to be created in executors.
1158+
"""
1159+
if TaskContext.get() is not None:
1160+
raise Exception("SparkContext should only be created and accessed on the driver.")
1161+
11481162

11491163
def _test():
11501164
import atexit

python/pyspark/tests/test_context.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,14 @@ def test_resources(self):
267267
resources = sc.resources
268268
self.assertEqual(len(resources), 0)
269269

270+
def test_disallow_to_create_spark_context_in_executors(self):
271+
# SPARK-32160: SparkContext should not be created in executors.
272+
with SparkContext("local-cluster[3, 1, 1024]") as sc:
273+
with self.assertRaises(Exception) as context:
274+
sc.range(2).foreach(lambda _: SparkContext())
275+
self.assertIn("SparkContext should only be created and accessed on the driver.",
276+
str(context.exception))
277+
270278

271279
class ContextTestsWithResources(unittest.TestCase):
272280

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1086,7 +1086,7 @@ object SparkSession extends Logging {
10861086
}
10871087

10881088
private def assertOnDriver(): Unit = {
1089-
if (Utils.isTesting && TaskContext.get != null) {
1089+
if (TaskContext.get != null) {
10901090
// we're accessing it during task execution, fail.
10911091
throw new IllegalStateException(
10921092
"SparkSession should only be created and accessed on the driver.")

sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,32 +27,29 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
2727

2828
class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSparkContext {
2929
private val random = new java.util.Random()
30-
private var taskContext: TaskContext = _
31-
32-
override def afterAll(): Unit = try {
33-
TaskContext.unset()
34-
} finally {
35-
super.afterAll()
36-
}
3730

3831
private def withExternalArray(inMemoryThreshold: Int, spillThreshold: Int)
3932
(f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = {
4033
sc = new SparkContext("local", "test", new SparkConf(false))
4134

42-
taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
35+
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
4336
TaskContext.setTaskContext(taskContext)
4437

45-
val array = new ExternalAppendOnlyUnsafeRowArray(
46-
taskContext.taskMemoryManager(),
47-
SparkEnv.get.blockManager,
48-
SparkEnv.get.serializerManager,
49-
taskContext,
50-
1024,
51-
SparkEnv.get.memoryManager.pageSizeBytes,
52-
inMemoryThreshold,
53-
spillThreshold)
54-
try f(array) finally {
55-
array.clear()
38+
try {
39+
val array = new ExternalAppendOnlyUnsafeRowArray(
40+
taskContext.taskMemoryManager(),
41+
SparkEnv.get.blockManager,
42+
SparkEnv.get.serializerManager,
43+
taskContext,
44+
1024,
45+
SparkEnv.get.memoryManager.pageSizeBytes,
46+
inMemoryThreshold,
47+
spillThreshold)
48+
try f(array) finally {
49+
array.clear()
50+
}
51+
} finally {
52+
TaskContext.unset()
5653
}
5754
}
5855

0 commit comments

Comments
 (0)