Skip to content

Commit 41e1a54

Browse files
Marcelo Vanzincmonkey
Marcelo Vanzin
authored andcommitted
[SPARK-19307][PYSPARK] Make sure user conf is propagated to SparkContext.
The code was failing to propagate the user conf in the case where the JVM was already initialized, which happens when a user submits a python script via spark-submit. Tested with new unit test and by running a python script in a real cluster. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#16682 from vanzin/SPARK-19307.
1 parent 1307432 commit 41e1a54

File tree

2 files changed

+23
-0
lines changed

2 files changed

+23
-0
lines changed

python/pyspark/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
132132
self._conf = conf
133133
else:
134134
self._conf = SparkConf(_jvm=SparkContext._jvm)
135+
if conf is not None:
136+
for k, v in conf.getAll():
137+
self._conf.set(k, v)
135138

136139
self._batchSize = batchSize # -1 represents an unlimited batch size
137140
self._unbatched_serializer = serializer

python/pyspark/tests.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2035,6 +2035,26 @@ def test_single_script_on_cluster(self):
20352035
self.assertEqual(0, proc.returncode)
20362036
self.assertIn("[2, 4, 6]", out.decode('utf-8'))
20372037

2038+
def test_user_configuration(self):
2039+
"""Make sure user configuration is respected (SPARK-19307)"""
2040+
script = self.createTempFile("test.py", """
2041+
|from pyspark import SparkConf, SparkContext
2042+
|
2043+
|conf = SparkConf().set("spark.test_config", "1")
2044+
|sc = SparkContext(conf = conf)
2045+
|try:
2046+
| if sc._conf.get("spark.test_config") != "1":
2047+
| raise Exception("Cannot find spark.test_config in SparkContext's conf.")
2048+
|finally:
2049+
| sc.stop()
2050+
""")
2051+
proc = subprocess.Popen(
2052+
[self.sparkSubmit, "--master", "local", script],
2053+
stdout=subprocess.PIPE,
2054+
stderr=subprocess.STDOUT)
2055+
out, err = proc.communicate()
2056+
self.assertEqual(0, proc.returncode, msg="Process failed with error:\n {0}".format(out))
2057+
20382058

20392059
class ContextTests(unittest.TestCase):
20402060

0 commit comments

Comments
 (0)