-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26019][PYSPARK] Allow insecure py4j gateways #23337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -112,6 +112,20 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, | |
ValueError:... | ||
""" | ||
self._callsite = first_spark_call() or CallSite(None, None, None) | ||
if gateway is not None and gateway.gateway_parameters.auth_token is None: | ||
allow_insecure_env = os.environ.get("PYSPARK_ALLOW_INSECURE_GATEWAY", "0") | ||
if allow_insecure_env == "1" or allow_insecure_env.lower() == "true": | ||
warnings.warn( | ||
"You are passing in an insecure Py4j gateway. This " | ||
"presents a security risk, and will be completely forbidden in Spark 3.0") | ||
else: | ||
raise ValueError( | ||
"You are trying to pass an insecure Py4j gateway to Spark. This" | ||
" presents a security risk. If you are sure you understand and accept this" | ||
" risk, you can set the environment variable" | ||
" 'PYSPARK_ALLOW_INSECURE_GATEWAY=1', but" | ||
" note this option will be removed in Spark 3.0") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. Honestly, I still think insecure is a misusage of Spark and It should be removed. I'm going to merge this as an effort to help upgrading Spark easier in other projects like Zeppelin. |
||
|
||
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) | ||
try: | ||
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -61,6 +61,7 @@ | |||||||
from pyspark import keyword_only | ||||||||
from pyspark.conf import SparkConf | ||||||||
from pyspark.context import SparkContext | ||||||||
from pyspark.java_gateway import _launch_gateway | ||||||||
from pyspark.rdd import RDD | ||||||||
from pyspark.files import SparkFiles | ||||||||
from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer, \ | ||||||||
|
@@ -2381,6 +2382,37 @@ def test_startTime(self): | |||||||
with SparkContext() as sc: | ||||||||
self.assertGreater(sc.startTime, 0) | ||||||||
|
||||||||
def test_forbid_insecure_gateway(self): | ||||||||
# By default, we fail immediately if you try to create a SparkContext | ||||||||
# with an insecure gateway | ||||||||
gateway = _launch_gateway(insecure=True) | ||||||||
log4j = gateway.jvm.org.apache.log4j | ||||||||
old_level = log4j.LogManager.getRootLogger().getLevel() | ||||||||
try: | ||||||||
log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL) | ||||||||
with self.assertRaises(Exception) as context: | ||||||||
SparkContext(gateway=gateway) | ||||||||
self.assertIn("insecure Py4j gateway", str(context.exception)) | ||||||||
self.assertIn("PYSPARK_ALLOW_INSECURE_GATEWAY", str(context.exception)) | ||||||||
self.assertIn("removed in Spark 3.0", str(context.exception)) | ||||||||
finally: | ||||||||
log4j.LogManager.getRootLogger().setLevel(old_level) | ||||||||
|
||||||||
def test_allow_insecure_gateway_with_conf(self): | ||||||||
with SparkContext._lock: | ||||||||
SparkContext._gateway = None | ||||||||
SparkContext._jvm = None | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this part of the test really bothers me, so I'd like to explain to reviewers. Without this, the test passes -- but it passes even without the changes to the main code! Or rather, it only passes when its run as part of the entire suite, it would fail when run individually. What's happening is that Now that in itself sounds crazy to me, and seems like a problem for things like Zeppelin. I tried just adding these two lines into I was hoping somebody else might have some ideas about what is going on or if there is a better way to do this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I usually have made separate classes in this case it's closely related with stop and start tho .. (for instance, https://github.com/apache/spark/blob/master/python/pyspark/sql/tests/test_session.py#L27-L73 and https://github.com/apache/spark/blob/master/python/pyspark/sql/tests/test_dataframe.py#L680-L682 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that's really answering my question. I don't have a problem calling start & stop, I'm wondering why spark/python/pyspark/context.py Lines 300 to 302 in 4d693ac
for normal use of spark, that's not a problem; but it seems like it would be a problem (a) in our tests and (b) for systems like zeppelin, that might have multiple spark contexts over the lifetime of the python session (I assume, anyway ...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, gotya. In that case, we could consider simply move the test class to the top in a separate class as well but .. yes I suspect tests depending on its order isn't a great idea in a way as well. I'm okay as long as the tests pass. I can take a separate look for this later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this logic is sort rough, in spark-testing-base for example in between tests where folks do not intend to reuse the same Spark context we also clear some extra properties (although we do reuse the gateway). I think for environments where folks want multiple SparkContexts from Python on the same machine they end up using multiple Python processes anyways. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok its good to get some confirmation of this weird behavior ... but I feel like I still don't understand why we don't reset |
||||||||
gateway = _launch_gateway(insecure=True) | ||||||||
try: | ||||||||
os.environ["PYSPARK_ALLOW_INSECURE_GATEWAY"] = "1" | ||||||||
with SparkContext(gateway=gateway) as sc: | ||||||||
a = sc.accumulator(1) | ||||||||
rdd = sc.parallelize([1, 2, 3]) | ||||||||
rdd.foreach(lambda x: a.add(x)) | ||||||||
self.assertEqual(7, a.value) | ||||||||
finally: | ||||||||
os.environ.pop("PYSPARK_ALLOW_INSECURE_GATEWAY", None) | ||||||||
|
||||||||
|
||||||||
class ConfTests(unittest.TestCase): | ||||||||
def test_memory_conf(self): | ||||||||
|
Uh oh!
There was an error while loading. Please reload this page.