Skip to content

Conversation

@huaxingao
Copy link
Contributor

What changes were proposed in this pull request?

add getActiveSession in session.py

How was this patch tested?

add doctest

@SparkQA
Copy link

SparkQA commented Aug 31, 2018

Test build #95507 has finished for PR 22295 at commit e9885b3.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 31, 2018

Test build #95509 has finished for PR 22295 at commit 89c3b44.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this return JVM instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Sorry for the late reply. Yes, this returns a JVM instance.
In the scala code, SparkSession.getActiveSession returns an Option[SparkSession]
I am not sure how to do a python equivalent of Scala's Option. In the following code, is there a way to wrap the python session in else path to something equivalent of Scala's Option? If not, can I just return the python session?

        if self._jsparkSession.getActiveSession() is None:
            return None
        else:
            return self.__class__(self._sc, self._jsparkSession.getActiveSession().get())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think we should return Python session one. JVM instance should not be exposed .. I assume returning None is fine. The thing is, we have the lack of session supports in PySpark. It's partially implemented but not very well tested as far as I can tell.

Can you add a set of tests for it, and manually test them as well? Actually, my guts say this is quite a big deal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I add a set of tests. Some of them are borrowed from SparkSessionBuilderSuite.scala

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So normally we try and have doc tests like these be examples of how the user should use this. So I would consider getting the active session and then doing something a normal user would with it (like paralleling a collection).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

..and probably shouldn't access _jsparkSession

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk @felixcheung Thanks for the review. I will change this.

@SparkQA
Copy link

SparkQA commented Sep 8, 2018

Test build #95815 has finished for PR 22295 at commit cd87f06.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2018

Test build #95889 has finished for PR 22295 at commit 2345e55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao, let's target this 3.0.

@SparkQA
Copy link

SparkQA commented Sep 11, 2018

Test build #95953 has finished for PR 22295 at commit 65fc45f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really close. The one thing which I'd like to see added is a test for getActiveSession when there is no active session.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this! Filed a follow up https://issues.apache.org/jira/browse/SPARK-25432

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks you very much for your comments.
I have a question here. In stop() method, shall we clear the activeSession too? Currently, it has

    def stop(self):
        """Stop the underlying :class:`SparkContext`.
        """
        self._jvm.SparkSession.clearDefaultSession()
        SparkSession._instantiatedSession = None

Do I need to add the following?

      self._jvm.SparkSession.clearActiveSession()

To test for getActiveSession when there is no active session, I am thinking of adding

    def test_get_active_session_when_no_active_session(self):
        spark = SparkSession.builder \
            .master("local") \
            .getOrCreate()
        spark.stop()
        active = spark.getActiveSession()
        self.assertEqual(active, None)

The test didn't pass because in stop(), the active session is not cleared.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds like the right approach and I think we need that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ueshin

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's just name it spark_context and spark_session

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't strongly agree here. I think given that the method names are camel case in the SparkSession & SparkContext in Python this naming is perfectly reasonable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to extend ReusedSQLTestCase? Looks we can just unittest.TestCase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon there's no strong need for it, however it does mean that the first getOrCreate will already have a session it can use, but given that we set up and tear down the session this may be less than ideal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for naming. Let's just follow Python's convention in those names

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some small comments, looking forward to seeing the fix on the stop side as well :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon there's no strong need for it, however it does mean that the first getOrCreate will already have a session it can use, but given that we set up and tear down the session this may be less than ideal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't strongly agree here. I think given that the method names are camel case in the SparkSession & SparkContext in Python this naming is perfectly reasonable.

@SparkQA
Copy link

SparkQA commented Sep 21, 2018

Test build #96451 has finished for PR 22295 at commit d7be3bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change this to 2.5

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon are you OK to mark this comment as resolved since we're now targeting 3.0?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, at that time, 2.5 was targeted. Now 3.0 is targeted per 9bf397c

@holdenk
Copy link
Contributor

holdenk commented Sep 27, 2018

LGTM except the 3.0 to 2.5 I'll change that during the merge.

@holdenk
Copy link
Contributor

holdenk commented Sep 27, 2018

nvm, the merge script only triggers the edits if we have conflicts. If you can update 3.0 to 2.5 I'd be happy to merge.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao, can you check if the active session is set? for instance when we createDataFrame? From a cursory look, we are not setting it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Seems to me that active session is set OK in the __init__. When createDataFrame, we already have a session, and the active session is already set in the __init__.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When createDataFrame, we already have a session

but wouldn't we not set the active session properly if session A sets an active session in __init__, and then session B sets an active session in __init__, and then session A calls createDataFrame ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Do you mean something like this:

    def test_two_spark_session(self):
        session1 = None
        session2 = None
        try:
            session1 = SparkSession.builder.config("key1", "value1").getOrCreate()
            session2 = SparkSession.builder.config("key2", "value2").getOrCreate()
            self.assertEqual(session1, session2)

            df = session1.createDataFrame([(1, 'Alice')], ['age', 'name'])
            self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')])
            activeSession1 = session1.getActiveSession()
            activeSession2 = session2.getActiveSession()
            self.assertEqual(activeSession1, activeSession1)

        finally:
            if session1 is not None:
                session1.stop()
            if session2 is not None:
                session2.stop()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simialr. I was expecting something like:

session1 = SparkSession.builder.config("key1", "value1").getOrCreate()
session2 = SparkSession.builder.config("key2", "value2").getOrCreate()

assert(session2 == SparkSession.getActiveSession())

session1.createDataFrame([(1, 'Alice')], ['age', 'name'])

assert(session1 == SparkSession.getActiveSession())

does this work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So @HyukjinKwon in this code session1 and session2 are already equal:

Welcome to
____ __
/ / ___ / /
\ / _ / _ `/ __/ '/
/
/ .
_/_,// //_\ version 2.3.1
/
/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.

session1 = SparkSession.builder.config("key1", "value1").getOrCreate()
session2 = SparkSession.builder.config("key2", "value2").getOrCreate()
session1
<pyspark.sql.session.SparkSession object at 0x7ff6d4843b00>
session2
<pyspark.sql.session.SparkSession object at 0x7ff6d4843b00>
session1 == session2
True

That being said the possibility of having multiple Spark session in Python is doable you manually have to call the init e.g.:

session3 = SparkSession(sc)
session3
<pyspark.sql.session.SparkSession object at 0x7ff6d3dbd160>

And supporting that is reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to support this we should have test for it, or if we aren't going to support this right now we should document the behaviour.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, okay. I had to be explicit. I meant:

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> SparkSession.getActiveSession
res0: Option[org.apache.spark.sql.SparkSession] = Some(org.apache.spark.sql.SparkSession@3ef4a8fb)

scala> val session1 = spark
session1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@3ef4a8fb

scala> val session2 = spark.newSession()
session2: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4b74a4d

scala> SparkSession.getActiveSession
res1: Option[org.apache.spark.sql.SparkSession] = Some(org.apache.spark.sql.SparkSession@3ef4a8fb)

scala> session2.createDataFrame(Seq(Tuple1(1)))
res2: org.apache.spark.sql.DataFrame = [_1: int]

scala> SparkSession.getActiveSession
res3: Option[org.apache.spark.sql.SparkSession] = Some(org.apache.spark.sql.SparkSession@4b74a4d)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk @HyukjinKwon
Thanks for the comments. I looked the scala code, it setActiveSession in createDataFrame.

  def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
    SparkSession.setActiveSession(this)
    ...
  }

I will do the same for python.

def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True):
        SparkSession._activeSession = self
        self._jvm.SparkSession.setActiveSession(self._jsparkSession)

Will also add a test

@SparkQA
Copy link

SparkQA commented Sep 27, 2018

Test build #96707 has finished for PR 22295 at commit c966846.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@huaxingao
Copy link
Contributor Author

I just saw this fix [SPARK-25525][SQL][PYSPARK] Do not update conf for existing SparkContext in SparkSession.getOrCreate. #22545
I will remove test_create_SparkContext_then_SparkSession

@SparkQA
Copy link

SparkQA commented Sep 27, 2018

Test build #96708 has finished for PR 22295 at commit 3e11d0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait .. this should be class method. since the scala usage is SparkSession. getActiveSession

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the class method should initialize JVM if non existent (see functions.py). Probably Spark context too. If exists, it should use the existing one.

Also, let's define this as a property since that's closer to Scala's usage.

I know it's difficult to define a static property. You can refer https://github.com/graphframes/graphframes/pull/169/files#diff-e81e6b169c0aa35012a3263b2f31b330R381 or we should consider adding this as a function

@SparkQA
Copy link

SparkQA commented Oct 1, 2018

Test build #96831 has finished for PR 22295 at commit 765cf27.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 1, 2018

Test build #96833 has finished for PR 22295 at commit b83cf8e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is when we share single JVM like Zeppelin. It should get the session from JVM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean in a multi-language notebook environment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I am not sure if I follow your suggestion correctly. Does the following look right to you?
session.py

    @classmethod
    @since(3.0)
    def getActiveSession(cls):
        from pyspark.sql import functions
        return functions.getActiveSession()

functions.py

@since(3.0)
def getActiveSession():
    from pyspark.sql import SparkSession
    sc = SparkContext._active_spark_context
    if sc is None:
      sc = SparkContext()

    if sc._jvm.SparkSession.getActiveSession().isDefined():
        SparkSession(sc, sc._jvm.SparkSession.getActiveSession().get())
        return SparkSession._activeSession
    else:
        return None

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it should look like that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this to 3.0. Per 9bf397c, looks we are going ahead for 3.0 now.

@SparkQA
Copy link

SparkQA commented Oct 8, 2018

Test build #97124 has finished for PR 22295 at commit 55f1b03.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. I have some questions but I think we're getting really close :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is being done to simplify implementation and we don't expect people to call it directly here we should mention that in the docstring and also use an _ prefix.

I disagree with @HyukjinKwon about this behaviour being what people would expect -- it doesn't match the Scala behaviour and one of the reasons to have something like getActiveSession() instead of getOrCreate() is to allow folks to do something if we have an active session or do something else if we don't.

What about if sc isNone we just return None since we can't have an activeSession without an active SparkContext -- does that sound reasonable?

That being said if folks feel strongly about this I'm ok with us setting up a SparkContext but we need to document that if that's the path we go.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we should match the behaviour with Scala side - that was my point essentially. The problem about the previous approach was that session was being handled within Python - I believe we will basically reuse JVM's session implementation rather than reimplementing the seperate Python session support within PySpark side.

What about if sc isNone we just return Nonesince we can't have an activeSession without an active SparkContext -- does that sound reasonable?

In that case, I think we should follow Scala's behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk @HyukjinKwon
Thanks for the comments.
I checked Scala's behavior:

  test("my test") {
    val cx = SparkContext.getActive
    val session = SparkSession.getActiveSession
    println(cx)
    println(session)
  }

The result is

None
None

So it returns None if sc isNone. Actually my current code returns None if sc isNone, but I will change the code a bit to make it more obvious. I will also add _ prefix in the function name and mention in the docstring that this function is not supposed to be called directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon are you OK to mark this comment as resolved since we're now targeting 3.0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the change for how we construct the SparkSession can we add a test that makes sure we do whatever we decide to with the SparkContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @holdenk
I will add a test for the above comment and also add a test for your comment regarding

self._jvm.SparkSession.setActiveSession(self._jsparkSession)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to support this we should have test for it, or if we aren't going to support this right now we should document the behaviour.

@SparkQA
Copy link

SparkQA commented Oct 16, 2018

Test build #97466 has finished for PR 22295 at commit 1ee58af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2018

Test build #97502 has finished for PR 22295 at commit 7c6d2d5.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2018

Test build #97503 has finished for PR 22295 at commit 56282da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh.. why is it in functions.py? I thought it should be in getActiveSession at session.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the _ prefix or the function itself?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the function itself ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can put this in try-finally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change. Thanks!

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just above SparkSession -> session

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change. Thanks!

@HyukjinKwon
Copy link
Member

Looks close to go.

@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97577 has finished for PR 22295 at commit 94e3db0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@holdenk
Copy link
Contributor

holdenk commented Oct 19, 2018

I'll leave this for if @HyukjinKwon has any final comments, otherwise I'm happy to merge.

@holdenk
Copy link
Contributor

holdenk commented Oct 26, 2018

Merged to master for 3.0. Thanks for fixing this @huaxingao :)

@asfgit asfgit closed this in d367bdc Oct 26, 2018
@huaxingao
Copy link
Contributor Author

Thank you very much for your help! ! @holdenk @HyukjinKwon

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

add getActiveSession  in session.py

## How was this patch tested?

add doctest

Closes apache#22295 from huaxingao/spark25255.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Holden Karau <holden@pigscanfly.ca>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants