-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25003][PYSPARK] Use SessionExtensions in Pyspark #21990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #94149 has finished for PR 21990 at commit
|
Previously Pyspark used the private constructor for SparkSession when building that object. This resulted in a SparkSession without checking the sql.extensions parameter for additional session extensions. To fix this we instead use the Session.builder() path as SparkR uses, this loads the extensions and allows their use in PySpark.
|
Test build #94151 has finished for PR 21990 at commit
|
|
@RussellSpitzer, let's close other ones except for this and name it |
python/pyspark/sql/session.py
Outdated
| jsparkSession = self._jvm.SparkSession(self._jsc.sc()) | ||
| jsparkSession = self._jvm.SparkSession.builder() \ | ||
| .sparkContext(self._jsc.sc()) \ | ||
| .getOrCreate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer, mind checking the logic getOrCreate inside Scala side and deduplicate them here while we are here? Some logics for instance setting default session, etc. are duplicated Here in Python side and there in Scala side.
It would be nicer if we have some tests as well. spark.sql.extensions are static configuration, right? in that case, we could add a test, for example, please refer #21007. I added a test with static configuration before there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah let me add in the test, and then I'll clear out all the python duplication of Scala code. I can make it more of a wrapper and less of a reimplementer.
|
@RussellSpitzer, also please ping me here if you face any difficulties. am wiling to help or push some changes to your branch. |
|
Test build #94866 has finished for PR 21990 at commit
|
|
Test build #94870 has finished for PR 21990 at commit
|
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wouldn't be needed since I did this for testing if the callback is called or not in the PR pointed out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me, i'll take that out.
|
Test build #94873 has finished for PR 21990 at commit
|
|
Test build #94975 has finished for PR 21990 at commit
|
|
@HyukjinKwon So i've been staring at this for a while today, and I guess the big issue is that we always need to make a Python SparkContext to get a handle on the JavaGateway, so everything that happens before the context is made cannot just be a wrapper of SQL methods and must reimplement. Unless we decided to refactor the code so that the JVM is more generally available (probably not possible) we will be stuck with redoing code in python ... |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer, does that mean it's difficult to deduplicate the logics between here and getOrCreate from Scala side? Will take a close look soon anyway.
This looks better be addressed since the current change actually executes quite duplicated code paths there ..
|
What I wanted was to just call the Scala Methods, instead of having half the code and half in python, but we create the JVM in the SparkContext creation code so this ends up not being a good method I think. We could just translate the rest of GetOrCreate into Python but then every time there is a patch of the code in scala it will need a Python mod as well. |
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SparkSessionExtensionSuite. -> SparkSessionExtensionSuite
@RussellSpitzer, I actually think we already duplicate some codes in Python side and Scala side at this code path now (see If that requires a bit of duplicated codes in Python side to avoid duplicated code path executions, then I think that can be a workaround to get through for now. |
Adds a test which sets spark.sql.extensions to a custom extension class. This is the same as the SparkExtensionsSuite which does the same thing in Scala.
|
@HyukjinKwon so you want me to rewrite the code in python? I will note SparkR is doing this exact same thing. |
|
Test build #96199 has finished for PR 21990 at commit
|
|
Added new method of injecting extensions, this way the "getOrCreate" code from the scala method is not needed. @HyukjinKwon |
|
Test build #96253 has finished for PR 21990 at commit
|
Previously the only way to add extensions to the session was via the getOrCreate method of the SparkSession Builder. To facilitate non-scala Session creation we add a new constructor which takes in just the context and Extensions. Then we also add a new Extensions constructor which given a SparkConf generates an Extensions object with user config already applied.
|
Test build #96257 has finished for PR 21990 at commit
|
|
I'm +1 on switching to the builder and not using the private interface. |
|
I'm fine with anything really, I still think the ideal solution is probably not to tie the creation of the py4j gateway to the SparkContext, but that's probably a much bigger refactor. |
|
Test build #96825 has finished for PR 21990 at commit
|
|
Test build #96828 has finished for PR 21990 at commit
|
|
Test build #96827 has finished for PR 21990 at commit
|
python/pyspark/sql/session.py
Outdated
| jsparkSession = self._jvm.SparkSession.getDefaultSession().get() | ||
| else: | ||
| jsparkSession = self._jvm.SparkSession(self._jsc.sc()) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh haha, let's get rid of this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm addicted to whitespace apparently
| * Initialize extensions if the user has defined a configurator class in their SparkConf. | ||
| * This class will be applied to the extensions passed into this function. | ||
| */ | ||
| private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make it private
|
|
||
| private[sql] def this(sc: SparkContext) { | ||
| this(sc, None, None, new SparkSessionExtensions) | ||
| SparkSession.applyExtensionsFromConf(sc.getConf, this.extensions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add some comments why this is only here in this constructor. It might look weird why this constructor specifically requires to run applyExtensionsFromConf alone.
python/pyspark/sql/tests.py
Outdated
| "The callback from the query execution listener should be called after 'toPandas'") | ||
|
|
||
|
|
||
| class SparkExtensionsTest(unittest.TestCase, SQLTestUtils): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think SQLTestUtils is not needed.
| * This class will be applied to the extensions passed into this function. | ||
| */ | ||
| private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) { | ||
| val extensionConfOption = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can even only pass conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS) as its argument instead of SparkConf, and name it applyExtensions.
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise
| * Initialize extensions if the user has defined a configurator class in their SparkConf. | ||
| * This class will be applied to the extensions passed into this function. | ||
| */ | ||
| private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about returning SparkSessionExtensions from this method, and modifying the secondary constructor of SparkSession as:
private[sql] def this(sc: SparkContext) {
this(sc, None, None,
SparkSession.applyExtensionsFromConf(sc.getConf, new SparkSessionExtensions))
}I'm a little worried whether the order we apply extensions might affect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thoughts, we could move the method call to the top of the default constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Default constructor of SparkSession?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this and was worried then about multiple invocations of the extensions
Once every time the SparkSession is cloned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's difficult here since I'm attempting to cause the least change in behavior for the old code paths :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am always a little nervous about having functions return objects they take in as parameters and then modify. Gives an impression to me that they are stateless. If you think that this is clearer I can make the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but in that case, we need to ensure that no injection of extensions is used in the default constructor to avoid initializing without injections from the conf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eh .. I think it's okay to have a function and returns that updated extensions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually either way looks okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with replacement then :)
|
Test build #97471 has finished for PR 21990 at commit
|
|
Addressed Comments from @HyukjinKwon , I'm interested in @ueshin 's suggestions, but I can't figure out how to do that unless we bake it into the Extensions constructor. If we place it in the Sessions constructor then invocations of "newSession" will reapply existing extensions. I added a note in the code. |
Removes SparkConf from applyExtensions, now only accepts a Optional string which can contain a classname for extensions. Removed errant whitespace.
|
Test build #97472 has finished for PR 21990 at commit
|
It now returns the extensions in modifies
|
Test build #97497 has finished for PR 21990 at commit
|
|
retest this please |
|
LGTM. |
|
Test build #97510 has finished for PR 21990 at commit
|
|
Merged to master. |
Master ## What changes were proposed in this pull request? Previously Pyspark used the private constructor for SparkSession when building that object. This resulted in a SparkSession without checking the sql.extensions parameter for additional session extensions. To fix this we instead use the Session.builder() path as SparkR uses, this loads the extensions and allows their use in PySpark. ## How was this patch tested? An integration test was added which mimics the Scala test for the same feature. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#21990 from RussellSpitzer/SPARK-25003-master. Authored-by: Russell Spitzer <Russell.Spitzer@gmail.com> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
|
Why not port it to Spark < 3 |
Master
What changes were proposed in this pull request?
Previously Pyspark used the private constructor for SparkSession when
building that object. This resulted in a SparkSession without checking
the sql.extensions parameter for additional session extensions. To fix
this we instead use the Session.builder() path as SparkR uses, this
loads the extensions and allows their use in PySpark.
How was this patch tested?
An integration test was added which mimics the Scala test for the same feature.
Please review http://spark.apache.org/contributing.html before opening a pull request.