Skip to content

Conversation

@RussellSpitzer
Copy link
Member

@RussellSpitzer RussellSpitzer commented Aug 3, 2018

Master

What changes were proposed in this pull request?

Previously Pyspark used the private constructor for SparkSession when
building that object. This resulted in a SparkSession without checking
the sql.extensions parameter for additional session extensions. To fix
this we instead use the Session.builder() path as SparkR uses, this
loads the extensions and allows their use in PySpark.

How was this patch tested?

An integration test was added which mimics the Scala test for the same feature.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94149 has finished for PR 21990 at commit f790ae0.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Previously Pyspark used the private constructor for SparkSession when
building that object. This resulted in a SparkSession without checking
the sql.extensions parameter for additional session extensions. To fix
this we instead use the Session.builder() path as SparkR uses, this
loads the extensions and allows their use in PySpark.
@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94151 has finished for PR 21990 at commit 84c2513.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@RussellSpitzer RussellSpitzer changed the title [SPARK-25003][PYSPARK] Use SessionExtensions in Pyspark [SPARK-25003][PYSPARK][MASTER] Use SessionExtensions in Pyspark Aug 6, 2018
@HyukjinKwon
Copy link
Member

@RussellSpitzer, let's close other ones except for this and name it [SPARK-25003][PYSPARK] Use SessionExtensions in Pyspark. Let me review this one within few days. Also, I don't think we should do it, at least, to branch-2.2. This logic here is quite convoluted and I would rather avoid to backport even to branch-2.3 actually.

@RussellSpitzer RussellSpitzer changed the title [SPARK-25003][PYSPARK][MASTER] Use SessionExtensions in Pyspark [SPARK-25003][PYSPARK] Use SessionExtensions in Pyspark Aug 7, 2018
jsparkSession = self._jvm.SparkSession(self._jsc.sc())
jsparkSession = self._jvm.SparkSession.builder() \
.sparkContext(self._jsc.sc()) \
.getOrCreate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer, mind checking the logic getOrCreate inside Scala side and deduplicate them here while we are here? Some logics for instance setting default session, etc. are duplicated Here in Python side and there in Scala side.

It would be nicer if we have some tests as well. spark.sql.extensions are static configuration, right? in that case, we could add a test, for example, please refer #21007. I added a test with static configuration before there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let me add in the test, and then I'll clear out all the python duplication of Scala code. I can make it more of a wrapper and less of a reimplementer.

@HyukjinKwon
Copy link
Member

@RussellSpitzer, also please ping me here if you face any difficulties. am wiling to help or push some changes to your branch.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #94866 has finished for PR 21990 at commit 7ba70b5.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SparkExtensionsTest(unittest.TestCase, SQLTestUtils):

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #94870 has finished for PR 21990 at commit d5c37b7.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SparkExtensionsTest(unittest.TestCase, SQLTestUtils):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldn't be needed since I did this for testing if the callback is called or not in the PR pointed out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, i'll take that out.

@SparkQA
Copy link

SparkQA commented Aug 17, 2018

Test build #94873 has finished for PR 21990 at commit 0eea205.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SparkExtensionsTest(unittest.TestCase, SQLTestUtils):

@SparkQA
Copy link

SparkQA commented Aug 20, 2018

Test build #94975 has finished for PR 21990 at commit 21ff627.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SparkExtensionsTest(unittest.TestCase, SQLTestUtils):

@RussellSpitzer
Copy link
Member Author

@HyukjinKwon So i've been staring at this for a while today, and I guess the big issue is that we always need to make a Python SparkContext to get a handle on the JavaGateway, so everything that happens before the context is made cannot just be a wrapper of SQL methods and must reimplement. Unless we decided to refactor the code so that the JVM is more generally available (probably not possible) we will be stuck with redoing code in python ...

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer, does that mean it's difficult to deduplicate the logics between here and getOrCreate from Scala side? Will take a close look soon anyway.

This looks better be addressed since the current change actually executes quite duplicated code paths there ..

@RussellSpitzer
Copy link
Member Author

What I wanted was to just call the Scala Methods, instead of having half the code and half in python, but we create the JVM in the SparkContext creation code so this ends up not being a good method I think. We could just translate the rest of GetOrCreate into Python but then every time there is a patch of the code in scala it will need a Python mod as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SparkSessionExtensionSuite. -> SparkSessionExtensionSuite

@HyukjinKwon
Copy link
Member

We could just translate the rest of GetOrCreate into Python but then every time there is a patch of the code in scala it will need a Python mod as well.

@RussellSpitzer, I actually think we already duplicate some codes in Python side and Scala side at this code path now (see getOrCreate at Scala side and __init__ at Python side). They are now duplicatedly executed with this change. I believe this isn't a orthogonal stuff to handle separately.

If that requires a bit of duplicated codes in Python side to avoid duplicated code path executions, then I think that can be a workaround to get through for now.

Adds a test which sets spark.sql.extensions to a custom extension
class. This is the same as the SparkExtensionsSuite which does the
same thing in Scala.
@RussellSpitzer
Copy link
Member Author

@HyukjinKwon so you want me to rewrite the code in python? I will note SparkR is doing this exact same thing.

@SparkQA
Copy link

SparkQA commented Sep 18, 2018

Test build #96199 has finished for PR 21990 at commit def4f3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SparkExtensionsTest(unittest.TestCase, SQLTestUtils):

@RussellSpitzer
Copy link
Member Author

Added new method of injecting extensions, this way the "getOrCreate" code from the scala method is not needed. @HyukjinKwon

@SparkQA
Copy link

SparkQA commented Sep 19, 2018

Test build #96253 has finished for PR 21990 at commit 7775c08.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Previously the only way to add extensions to the session was via the
getOrCreate method of the SparkSession Builder. To facilitate non-scala
Session creation we add a new constructor which takes in just the
context and Extensions. Then we also add a new Extensions constructor
which given a SparkConf generates an Extensions object with user config
already applied.
@SparkQA
Copy link

SparkQA commented Sep 19, 2018

Test build #96257 has finished for PR 21990 at commit 67d9772.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented Sep 27, 2018

I'm +1 on switching to the builder and not using the private interface.

@RussellSpitzer
Copy link
Member Author

I'm fine with anything really, I still think the ideal solution is probably not to tie the creation of the py4j gateway to the SparkContext, but that's probably a much bigger refactor.

@SparkQA
Copy link

SparkQA commented Oct 1, 2018

Test build #96825 has finished for PR 21990 at commit 27c9d28.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 1, 2018

Test build #96828 has finished for PR 21990 at commit 4ddaff8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 1, 2018

Test build #96827 has finished for PR 21990 at commit cf7cf75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

jsparkSession = self._jvm.SparkSession.getDefaultSession().get()
else:
jsparkSession = self._jvm.SparkSession(self._jsc.sc())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh haha, let's get rid of this change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm addicted to whitespace apparently

* Initialize extensions if the user has defined a configurator class in their SparkConf.
* This class will be applied to the extensions passed into this function.
*/
private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it private


private[sql] def this(sc: SparkContext) {
this(sc, None, None, new SparkSessionExtensions)
SparkSession.applyExtensionsFromConf(sc.getConf, this.extensions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some comments why this is only here in this constructor. It might look weird why this constructor specifically requires to run applyExtensionsFromConf alone.

"The callback from the query execution listener should be called after 'toPandas'")


class SparkExtensionsTest(unittest.TestCase, SQLTestUtils):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SQLTestUtils is not needed.

* This class will be applied to the extensions passed into this function.
*/
private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) {
val extensionConfOption = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can even only pass conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS) as its argument instead of SparkConf, and name it applyExtensions.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise

* Initialize extensions if the user has defined a configurator class in their SparkConf.
* This class will be applied to the extensions passed into this function.
*/
private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about returning SparkSessionExtensions from this method, and modifying the secondary constructor of SparkSession as:

private[sql] def this(sc: SparkContext) {
  this(sc, None, None,
    SparkSession.applyExtensionsFromConf(sc.getConf, new SparkSessionExtensions))
}

I'm a little worried whether the order we apply extensions might affect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thoughts, we could move the method call to the top of the default constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Default constructor of SparkSession?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this and was worried then about multiple invocations of the extensions
Once every time the SparkSession is cloned

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's difficult here since I'm attempting to cause the least change in behavior for the old code paths :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am always a little nervous about having functions return objects they take in as parameters and then modify. Gives an impression to me that they are stateless. If you think that this is clearer I can make the change.

Copy link
Member

@ueshin ueshin Oct 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but in that case, we need to ensure that no injection of extensions is used in the default constructor to avoid initializing without injections from the conf.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh .. I think it's okay to have a function and returns that updated extensions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually either way looks okay.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with replacement then :)

@SparkQA
Copy link

SparkQA commented Oct 16, 2018

Test build #97471 has finished for PR 21990 at commit 8ab76d4.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@RussellSpitzer
Copy link
Member Author

Addressed Comments from @HyukjinKwon , I'm interested in @ueshin 's suggestions, but I can't figure out how to do that unless we bake it into the Extensions constructor. If we place it in the Sessions constructor then invocations of "newSession" will reapply existing extensions. I added a note in the code.

Removes SparkConf from applyExtensions, now only accepts a Optional
string which can contain a classname for extensions. Removed errant
whitespace.
@SparkQA
Copy link

SparkQA commented Oct 17, 2018

Test build #97472 has finished for PR 21990 at commit d9b2a55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

It now returns the extensions in modifies
@SparkQA
Copy link

SparkQA commented Oct 17, 2018

Test build #97497 has finished for PR 21990 at commit 3629c78.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@ueshin
Copy link
Member

ueshin commented Oct 18, 2018

LGTM.

@SparkQA
Copy link

SparkQA commented Oct 18, 2018

Test build #97510 has finished for PR 21990 at commit 3629c78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in c3eaee7 Oct 18, 2018
@RussellSpitzer RussellSpitzer deleted the SPARK-25003-master branch January 11, 2019 19:41
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
Master

## What changes were proposed in this pull request?

Previously Pyspark used the private constructor for SparkSession when
building that object. This resulted in a SparkSession without checking
the sql.extensions parameter for additional session extensions. To fix
this we instead use the Session.builder() path as SparkR uses, this
loads the extensions and allows their use in PySpark.

## How was this patch tested?

An integration test was added which mimics the Scala test for the same feature.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#21990 from RussellSpitzer/SPARK-25003-master.

Authored-by: Russell Spitzer <Russell.Spitzer@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
nateagr pushed a commit to nateagr/spark that referenced this pull request Dec 4, 2020
@shiyuhang0
Copy link

Why not port it to Spark < 3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants