-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25255][PYTHON]Add getActiveSession to SparkSession in PySpark #22295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #95507 has finished for PR 22295 at commit
|
|
Test build #95509 has finished for PR 22295 at commit
|
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this return JVM instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon Sorry for the late reply. Yes, this returns a JVM instance.
In the scala code, SparkSession.getActiveSession returns an Option[SparkSession]
I am not sure how to do a python equivalent of Scala's Option. In the following code, is there a way to wrap the python session in else path to something equivalent of Scala's Option? If not, can I just return the python session?
if self._jsparkSession.getActiveSession() is None:
return None
else:
return self.__class__(self._sc, self._jsparkSession.getActiveSession().get())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I think we should return Python session one. JVM instance should not be exposed .. I assume returning None is fine. The thing is, we have the lack of session supports in PySpark. It's partially implemented but not very well tested as far as I can tell.
Can you add a set of tests for it, and manually test them as well? Actually, my guts say this is quite a big deal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I add a set of tests. Some of them are borrowed from SparkSessionBuilderSuite.scala
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So normally we try and have doc tests like these be examples of how the user should use this. So I would consider getting the active session and then doing something a normal user would with it (like paralleling a collection).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
..and probably shouldn't access _jsparkSession
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk @felixcheung Thanks for the review. I will change this.
|
Test build #95815 has finished for PR 22295 at commit
|
|
Test build #95889 has finished for PR 22295 at commit
|
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huaxingao, let's target this 3.0.
|
Test build #95953 has finished for PR 22295 at commit
|
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really close. The one thing which I'd like to see added is a test for getActiveSession when there is no active session.
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this! Filed a follow up https://issues.apache.org/jira/browse/SPARK-25432
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks you very much for your comments.
I have a question here. In stop() method, shall we clear the activeSession too? Currently, it has
def stop(self):
"""Stop the underlying :class:`SparkContext`.
"""
self._jvm.SparkSession.clearDefaultSession()
SparkSession._instantiatedSession = None
Do I need to add the following?
self._jvm.SparkSession.clearActiveSession()
To test for getActiveSession when there is no active session, I am thinking of adding
def test_get_active_session_when_no_active_session(self):
spark = SparkSession.builder \
.master("local") \
.getOrCreate()
spark.stop()
active = spark.getActiveSession()
self.assertEqual(active, None)
The test didn't pass because in stop(), the active session is not cleared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that sounds like the right approach and I think we need that.
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @ueshin
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's just name it spark_context and spark_session
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't strongly agree here. I think given that the method names are camel case in the SparkSession & SparkContext in Python this naming is perfectly reasonable.
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to extend ReusedSQLTestCase? Looks we can just unittest.TestCase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon there's no strong need for it, however it does mean that the first getOrCreate will already have a session it can use, but given that we set up and tear down the session this may be less than ideal.
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for naming. Let's just follow Python's convention in those names
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some small comments, looking forward to seeing the fix on the stop side as well :)
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon there's no strong need for it, however it does mean that the first getOrCreate will already have a session it can use, but given that we set up and tear down the session this may be less than ideal.
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't strongly agree here. I think given that the method names are camel case in the SparkSession & SparkContext in Python this naming is perfectly reasonable.
|
Test build #96451 has finished for PR 22295 at commit
|
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's change this to 2.5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon are you OK to mark this comment as resolved since we're now targeting 3.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, at that time, 2.5 was targeted. Now 3.0 is targeted per 9bf397c
|
LGTM except the 3.0 to 2.5 I'll change that during the merge. |
|
nvm, the merge script only triggers the edits if we have conflicts. If you can update 3.0 to 2.5 I'd be happy to merge. |
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huaxingao, can you check if the active session is set? for instance when we createDataFrame? From a cursory look, we are not setting it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon Seems to me that active session is set OK in the __init__. When createDataFrame, we already have a session, and the active session is already set in the __init__.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When createDataFrame, we already have a session
but wouldn't we not set the active session properly if session A sets an active session in __init__, and then session B sets an active session in __init__, and then session A calls createDataFrame ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon Do you mean something like this:
def test_two_spark_session(self):
session1 = None
session2 = None
try:
session1 = SparkSession.builder.config("key1", "value1").getOrCreate()
session2 = SparkSession.builder.config("key2", "value2").getOrCreate()
self.assertEqual(session1, session2)
df = session1.createDataFrame([(1, 'Alice')], ['age', 'name'])
self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')])
activeSession1 = session1.getActiveSession()
activeSession2 = session2.getActiveSession()
self.assertEqual(activeSession1, activeSession1)
finally:
if session1 is not None:
session1.stop()
if session2 is not None:
session2.stop()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simialr. I was expecting something like:
session1 = SparkSession.builder.config("key1", "value1").getOrCreate()
session2 = SparkSession.builder.config("key2", "value2").getOrCreate()
assert(session2 == SparkSession.getActiveSession())
session1.createDataFrame([(1, 'Alice')], ['age', 'name'])
assert(session1 == SparkSession.getActiveSession())does this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So @HyukjinKwon in this code session1 and session2 are already equal:
Welcome to
____ __
/ / ___ / /
\ / _ / _ `/ __/ '/
/ / ._/_,// //_\ version 2.3.1
//Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.session1 = SparkSession.builder.config("key1", "value1").getOrCreate()
session2 = SparkSession.builder.config("key2", "value2").getOrCreate()
session1
<pyspark.sql.session.SparkSession object at 0x7ff6d4843b00>
session2
<pyspark.sql.session.SparkSession object at 0x7ff6d4843b00>
session1 == session2
True
That being said the possibility of having multiple Spark session in Python is doable you manually have to call the init e.g.:
session3 = SparkSession(sc)
session3
<pyspark.sql.session.SparkSession object at 0x7ff6d3dbd160>
And supporting that is reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to support this we should have test for it, or if we aren't going to support this right now we should document the behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, okay. I had to be explicit. I meant:
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> SparkSession.getActiveSession
res0: Option[org.apache.spark.sql.SparkSession] = Some(org.apache.spark.sql.SparkSession@3ef4a8fb)
scala> val session1 = spark
session1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@3ef4a8fb
scala> val session2 = spark.newSession()
session2: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4b74a4d
scala> SparkSession.getActiveSession
res1: Option[org.apache.spark.sql.SparkSession] = Some(org.apache.spark.sql.SparkSession@3ef4a8fb)
scala> session2.createDataFrame(Seq(Tuple1(1)))
res2: org.apache.spark.sql.DataFrame = [_1: int]
scala> SparkSession.getActiveSession
res3: Option[org.apache.spark.sql.SparkSession] = Some(org.apache.spark.sql.SparkSession@4b74a4d)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk @HyukjinKwon
Thanks for the comments. I looked the scala code, it setActiveSession in createDataFrame.
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
SparkSession.setActiveSession(this)
...
}
I will do the same for python.
def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True):
SparkSession._activeSession = self
self._jvm.SparkSession.setActiveSession(self._jsparkSession)
Will also add a test
|
Test build #96707 has finished for PR 22295 at commit
|
|
I just saw this fix [SPARK-25525][SQL][PYSPARK] Do not update conf for existing SparkContext in SparkSession.getOrCreate. #22545 |
|
Test build #96708 has finished for PR 22295 at commit
|
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait .. this should be class method. since the scala usage is SparkSession. getActiveSession
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the class method should initialize JVM if non existent (see functions.py). Probably Spark context too. If exists, it should use the existing one.
Also, let's define this as a property since that's closer to Scala's usage.
I know it's difficult to define a static property. You can refer https://github.com/graphframes/graphframes/pull/169/files#diff-e81e6b169c0aa35012a3263b2f31b330R381 or we should consider adding this as a function
|
Test build #96831 has finished for PR 22295 at commit
|
|
Test build #96833 has finished for PR 22295 at commit
|
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem here is when we share single JVM like Zeppelin. It should get the session from JVM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean in a multi-language notebook environment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I am not sure if I follow your suggestion correctly. Does the following look right to you?
session.py
@classmethod
@since(3.0)
def getActiveSession(cls):
from pyspark.sql import functions
return functions.getActiveSession()
functions.py
@since(3.0)
def getActiveSession():
from pyspark.sql import SparkSession
sc = SparkContext._active_spark_context
if sc is None:
sc = SparkContext()
if sc._jvm.SparkSession.getActiveSession().isDefined():
SparkSession(sc, sc._jvm.SparkSession.getActiveSession().get())
return SparkSession._activeSession
else:
return None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, it should look like that
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do this to 3.0. Per 9bf397c, looks we are going ahead for 3.0 now.
|
Test build #97124 has finished for PR 22295 at commit
|
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this. I have some questions but I think we're getting really close :)
python/pyspark/sql/functions.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is being done to simplify implementation and we don't expect people to call it directly here we should mention that in the docstring and also use an _ prefix.
I disagree with @HyukjinKwon about this behaviour being what people would expect -- it doesn't match the Scala behaviour and one of the reasons to have something like getActiveSession() instead of getOrCreate() is to allow folks to do something if we have an active session or do something else if we don't.
What about if sc isNone we just return None since we can't have an activeSession without an active SparkContext -- does that sound reasonable?
That being said if folks feel strongly about this I'm ok with us setting up a SparkContext but we need to document that if that's the path we go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, we should match the behaviour with Scala side - that was my point essentially. The problem about the previous approach was that session was being handled within Python - I believe we will basically reuse JVM's session implementation rather than reimplementing the seperate Python session support within PySpark side.
What about if sc isNone we just return Nonesince we can't have an activeSession without an active SparkContext -- does that sound reasonable?
In that case, I think we should follow Scala's behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk @HyukjinKwon
Thanks for the comments.
I checked Scala's behavior:
test("my test") {
val cx = SparkContext.getActive
val session = SparkSession.getActiveSession
println(cx)
println(session)
}
The result is
None
None
So it returns None if sc isNone. Actually my current code returns None if sc isNone, but I will change the code a bit to make it more obvious. I will also add _ prefix in the function name and mention in the docstring that this function is not supposed to be called directly.
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon are you OK to mark this comment as resolved since we're now targeting 3.0?
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the change for how we construct the SparkSession can we add a test that makes sure we do whatever we decide to with the SparkContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @holdenk
I will add a test for the above comment and also add a test for your comment regarding
self._jvm.SparkSession.setActiveSession(self._jsparkSession)
python/pyspark/sql/session.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to support this we should have test for it, or if we aren't going to support this right now we should document the behaviour.
|
Test build #97466 has finished for PR 22295 at commit
|
|
Test build #97502 has finished for PR 22295 at commit
|
|
Test build #97503 has finished for PR 22295 at commit
|
python/pyspark/sql/functions.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eh.. why is it in functions.py? I thought it should be in getActiveSession at session.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the _ prefix or the function itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the function itself ..
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can put this in try-finally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change. Thanks!
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just above SparkSession -> session
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change. Thanks!
|
Looks close to go. |
|
Test build #97577 has finished for PR 22295 at commit
|
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
I'll leave this for if @HyukjinKwon has any final comments, otherwise I'm happy to merge. |
|
Merged to master for 3.0. Thanks for fixing this @huaxingao :) |
|
Thank you very much for your help! ! @holdenk @HyukjinKwon |
## What changes were proposed in this pull request? add getActiveSession in session.py ## How was this patch tested? add doctest Closes apache#22295 from huaxingao/spark25255. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Holden Karau <holden@pigscanfly.ca>
What changes were proposed in this pull request?
add getActiveSession in session.py
How was this patch tested?
add doctest