-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-11775][PYSPARK][SQL] Allow PySpark to register Java UDF #9766
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #46078 has finished for PR 9766 at commit
|
Could anyone help review this ? Thanks |
Test build #50066 has finished for PR 9766 at commit
|
please test it again. |
@@ -224,6 +224,10 @@ def registerFunction(self, name, f, returnType=StringType()): | |||
udf = UserDefinedFunction(f, returnType, name) | |||
self._ssql_ctx.udf().registerPython(name, udf._judf) | |||
|
|||
def registerJavaFunction(self, name, javaClassName, returnType): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll probably want a since annotation as well as some PyDoc here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also add some tests.
I think this could be useful, although if people are already intermixing Scala (or Java) + Python code maybe they should just make the registration code in Java and call the register function using py4j? This could be more convenient though. Where do you think this would be most useful? |
Thanks @holdenk for review this. One scenario is that the udf function has already been implemented in java, user don't need to wrap it using py4j and register it as python udf. This might be valuable when one company has its own udf repository, they can just implement each udf using one language (java/python) and its pyspark user can just register the UDF without wrap it using py4j. |
Oh right, I was thinking more not wrapping the UDF and turning it into a Python UDF (thats going to kill performance) - but more if one has Scala/Java UDFs is the overhead of registering it in Scala code and use it in Python (e.g. writing a function like https://github.com/sparklingpandas/sparklingpandas/blob/master/src/main/scala/com/sparklingpandas/AggregationUDFs.scala#L36 and then calling it with py4j) high enough having a wrapper to do it directly from python is useful? |
Do you mean wrapping java/scala UDF using python and call it through py4j ? One concern is the performance, because in this way we still need to launch python process. And if you already have java/scala UDF why not register it directly in python (what this ticket is doing) and call it using data frame api. |
@zjffdu Will |
@davies It seems not supported based on my experiment.
|
@zjffdu Maybe we should support register a Java UDF using SQL(SQL users can also benefit from that) |
@zjffdu It seems this is useful, could you add docs and tests for this? |
@davies Sure, I will add tests and docs for it. |
Test build #58468 has finished for PR 9766 at commit
|
Test build #58482 has finished for PR 9766 at commit
|
@zjffdu |
Without this feature, you can just call the builtin udf (org.apache.spark.sql.functions), but can't register your custom udf. This PR is to allow user to register his custom java udf. |
@davies Would you mind to take a look at it ? Thanks |
Test build #59615 has finished for PR 9766 at commit
|
I was looking at some similar stuff as part of #13571 and I was thinking that (to match the Scala API) it would be good to return the UDF object as well so people can use it progmatically with the DataFrame API instead of just just limited to using it inside of SQL queries. |
* @param className | ||
* @param returnType | ||
*/ | ||
def registerJava(name: String, className: String, returnType: DataType): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if we want to expose this API for general use - maybe make it private so that it can only be called from Python? And maybe update the scaladoc to something like "Register a Java UDF class using reflection - for use from Python".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+ 1 for update the scaladoc. Register a Java UDF class
doesn't exactly convey the meaning of this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides, there are no document for the parameters. Should be better to add doc for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Where do we stand on this, I just reapplied this patch to a spark 2.1-xxx build to get the same behaviour. |
Rebase the PR, @davies @JoshRosen Could you help to review it ? Thanks |
Test build #65797 has finished for PR 9766 at commit
|
+1 to this functionality, but also to the request to add more tests and documentation. It would also to be good to comment on the idea of using SQL as a more general way to implement this. |
Test build #66599 has finished for PR 9766 at commit
|
"""Register a java UDF so it can be used in SQL statements. | ||
|
||
In addition to a name and the function itself, the return type can be optionally specified. | ||
When the return type is not given it default to a string and conversion will automatically |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does this conversion happen? Are we sure that it works (given there are no tests that I see).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's my mistake of copying these comments, actually there's no conversion.
def registerJava(name: String, className: String, returnType: DataType): Unit = { | ||
|
||
try { | ||
// scalastyle:off classforname |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This style rule is here to prevent misuse. Is there a reason we aren't using our utility functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
import scala.reflect.runtime.universe.TypeTag | ||
import scala.util.Try | ||
|
||
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this JVM specific? What is this being used for? Is there another way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Test build #66791 has finished for PR 9766 at commit
|
Test build #66793 has finished for PR 9766 at commit
|
Test build #66794 has finished for PR 9766 at commit
|
Test build #66796 has finished for PR 9766 at commit
|
Test build #66801 has finished for PR 9766 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more comments. I think this is going to be a popular feature!
"""Register a java UDF so it can be used in SQL statements. | ||
|
||
In addition to a name and the function itself, the return type can be optionally specified. | ||
When the return type is not given it would infer the returnType via reflection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: its a little odd to mix return type
with returnType
. Perhaps, "When the return type is not specified we attempt to infer it using reflection"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
/** | ||
* It is used for register Java UDF from PySpark | ||
*/ | ||
public class JavaStringLength implements UDF1<String, Integer> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be moved to src/test
? It would be better to not distribute it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -414,6 +418,84 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends | |||
////////////////////////////////////////////////////////////////////////////////////////////// | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to turn style back on here since most of this function is not auto generated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can turn it on, but it would make the function less readable, especially for the following statements where it beyond line length limitation.
case 14 => register(name, udf.asInstanceOf[UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
* @param returnDataType return type of udf. If it is null, spark would try to infer | ||
* via reflection. | ||
*/ | ||
def registerJava(name: String, className: String, returnDataType: DataType): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to make this non-public? I believe we do this in other cases for code only called from python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
var returnType = returnDataType | ||
if (returnType == null) { | ||
if (udfReturnType.isInstanceOf[Class[_]]) { | ||
returnType = udfReturnType.asInstanceOf[Class[_]].getCanonicalName match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use JavaTypeInference
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the hint, fixed
Test build #66866 has finished for PR 9766 at commit
|
Test build #66868 has finished for PR 9766 at commit
|
Test build #66870 has finished for PR 9766 at commit
|
Thanks, merging to master. |
Many thanks for this feature! I've been using this code in production with Spark 1.6 (as patch) and never seen any issues with stability. 😄 |
Currently pyspark can only call the builtin java UDF, but can not call custom java UDF. It would be better to allow that. 2 benefits: * Leverage the power of rich third party java library * Improve the performance. Because if we use python UDF, python daemons will be started on worker which will affect the performance. Author: Jeff Zhang <zjffdu@apache.org> Closes apache#9766 from zjffdu/SPARK-11775.
Currently pyspark can only call the builtin java UDF, but can not call custom java UDF. It would be better to allow that. 2 benefits: * Leverage the power of rich third party java library * Improve the performance. Because if we use python UDF, python daemons will be started on worker which will affect the performance. Author: Jeff Zhang <zjffdu@apache.org> Closes apache#9766 from zjffdu/SPARK-11775.
Currently pyspark can only call the builtin java UDF, but can not call custom java UDF. It would be better to allow that. 2 benefits: