Skip to content

Commit c9f075a

Browse files
author
Marcelo Vanzin
committed
[SPARK-19307][PYSPARK] Make sure user conf is propagated to SparkContext.
The code was failing to propagate the user conf in the case where the JVM was already initialized, which happens when a user submits a python script via spark-submit. Tested with new unit test and by running a python script in a real cluster. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16682 from vanzin/SPARK-19307. (cherry picked from commit 92afaa9) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent af95455 commit c9f075a

File tree

2 files changed

+23
-0
lines changed

2 files changed

+23
-0
lines changed

python/pyspark/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
132132
self._conf = conf
133133
else:
134134
self._conf = SparkConf(_jvm=SparkContext._jvm)
135+
if conf is not None:
136+
for k, v in conf.getAll():
137+
self._conf.set(k, v)
135138

136139
self._batchSize = batchSize # -1 represents an unlimited batch size
137140
self._unbatched_serializer = serializer

python/pyspark/tests.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1970,6 +1970,26 @@ def test_single_script_on_cluster(self):
19701970
self.assertEqual(0, proc.returncode)
19711971
self.assertIn("[2, 4, 6]", out.decode('utf-8'))
19721972

1973+
def test_user_configuration(self):
1974+
"""Make sure user configuration is respected (SPARK-19307)"""
1975+
script = self.createTempFile("test.py", """
1976+
|from pyspark import SparkConf, SparkContext
1977+
|
1978+
|conf = SparkConf().set("spark.test_config", "1")
1979+
|sc = SparkContext(conf = conf)
1980+
|try:
1981+
| if sc._conf.get("spark.test_config") != "1":
1982+
| raise Exception("Cannot find spark.test_config in SparkContext's conf.")
1983+
|finally:
1984+
| sc.stop()
1985+
""")
1986+
proc = subprocess.Popen(
1987+
[self.sparkSubmit, "--master", "local", script],
1988+
stdout=subprocess.PIPE,
1989+
stderr=subprocess.STDOUT)
1990+
out, err = proc.communicate()
1991+
self.assertEqual(0, proc.returncode, msg="Process failed with error:\n {0}".format(out))
1992+
19731993

19741994
class ContextTests(unittest.TestCase):
19751995

0 commit comments

Comments
 (0)