Skip to content

[SPARK-11775][PYSPARK][SQL] Allow PySpark to register Java UDF #9766

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

zjffdu
Copy link
Contributor

@zjffdu zjffdu commented Nov 17, 2015

Currently pyspark can only call the builtin java UDF, but can not call custom java UDF. It would be better to allow that. 2 benefits:

  • Leverage the power of rich third party java library
  • Improve the performance. Because if we use python UDF, python daemons will be started on worker which will affect the performance.

@SparkQA
Copy link

SparkQA commented Nov 17, 2015

Test build #46078 has finished for PR 9766 at commit dd1e269.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * throw new IOException(s\"UDF class $\n * throw new IOException(s\"It is invalid to implement multiple UDF interfaces, UDF class $\n * case n => logError(s\"UDF class with $\n * logError(s\"Can not instantiate class $\n * case e: ClassNotFoundException => logError(s\"Can not load class $\n

@zjffdu
Copy link
Contributor Author

zjffdu commented Dec 1, 2015

Could anyone help review this ? Thanks

@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #50066 has finished for PR 9766 at commit 2e17865.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zjffdu
Copy link
Contributor Author

zjffdu commented Jan 26, 2016

please test it again.

@@ -224,6 +224,10 @@ def registerFunction(self, name, f, returnType=StringType()):
udf = UserDefinedFunction(f, returnType, name)
self._ssql_ctx.udf().registerPython(name, udf._judf)

def registerJavaFunction(self, name, javaClassName, returnType):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll probably want a since annotation as well as some PyDoc here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add some tests.

@holdenk
Copy link
Contributor

holdenk commented Jan 27, 2016

I think this could be useful, although if people are already intermixing Scala (or Java) + Python code maybe they should just make the registration code in Java and call the register function using py4j? This could be more convenient though. Where do you think this would be most useful?
(Also please add tests :))

@zjffdu
Copy link
Contributor Author

zjffdu commented Jan 27, 2016

Thanks @holdenk for review this. One scenario is that the udf function has already been implemented in java, user don't need to wrap it using py4j and register it as python udf. This might be valuable when one company has its own udf repository, they can just implement each udf using one language (java/python) and its pyspark user can just register the UDF without wrap it using py4j.

@holdenk
Copy link
Contributor

holdenk commented Jan 28, 2016

Oh right, I was thinking more not wrapping the UDF and turning it into a Python UDF (thats going to kill performance) - but more if one has Scala/Java UDFs is the overhead of registering it in Scala code and use it in Python (e.g. writing a function like https://github.com/sparklingpandas/sparklingpandas/blob/master/src/main/scala/com/sparklingpandas/AggregationUDFs.scala#L36 and then calling it with py4j) high enough having a wrapper to do it directly from python is useful?

@zjffdu
Copy link
Contributor Author

zjffdu commented Feb 2, 2016

Do you mean wrapping java/scala UDF using python and call it through py4j ? One concern is the performance, because in this way we still need to launch python process. And if you already have java/scala UDF why not register it directly in python (what this ticket is doing) and call it using data frame api.

@davies
Copy link
Contributor

davies commented Apr 18, 2016

@zjffdu Will CREATE FUNCTION work for this case?

@zjffdu
Copy link
Contributor Author

zjffdu commented Apr 19, 2016

@davies It seems not supported based on my experiment.
Besides, I found 2 other issues

  1. name of the create function will prepend "default" (might be database name), is it expected ?
  2. seems HiveContext can only create hive udf but not generic spark udf. I don't think it make sense.
Py4JJavaError: An error occurred while calling o504.select.
: java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead.
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeFunctionBuilder(SessionCatalog.scala:520)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:611)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6$$anonfun$applyOrElse$35.apply(Analyzer.scala:837)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6$$anonfun$applyOrElse$35.apply(Analyzer.scala:837)
    at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6.applyOrElse(Analyzer.scala:836)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6.applyOrElse(Analyzer.scala:824)

@davies
Copy link
Contributor

davies commented Apr 19, 2016

@zjffdu Maybe we should support register a Java UDF using SQL(SQL users can also benefit from that)

@davies
Copy link
Contributor

davies commented Apr 19, 2016

@zjffdu It seems this is useful, could you add docs and tests for this?

@zjffdu
Copy link
Contributor Author

zjffdu commented Apr 19, 2016

@davies Sure, I will add tests and docs for it.

@SparkQA
Copy link

SparkQA commented May 12, 2016

Test build #58468 has finished for PR 9766 at commit 2e17865.

  • This patch fails R style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 12, 2016

Test build #58482 has finished for PR 9766 at commit ed275c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@CosmoYs
Copy link

CosmoYs commented May 13, 2016

@zjffdu
I’m working on a project which involves the integration between pyspark and java by using udf, this feature can be really useful.
I read through your discussion, but I’m not clear with something. Without this feature, we can still call a java udf in pyspark through py4j ? Could you explain this in more detail ? Thanks.

@zjffdu
Copy link
Contributor Author

zjffdu commented May 13, 2016

Without this feature, you can just call the builtin udf (org.apache.spark.sql.functions), but can't register your custom udf. This PR is to allow user to register his custom java udf.

@zjffdu
Copy link
Contributor Author

zjffdu commented May 30, 2016

@davies Would you mind to take a look at it ? Thanks

@SparkQA
Copy link

SparkQA commented May 30, 2016

Test build #59615 has finished for PR 9766 at commit 5feb2e4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented Jul 22, 2016

I was looking at some similar stuff as part of #13571 and I was thinking that (to match the Scala API) it would be good to return the UDF object as well so people can use it progmatically with the DataFrame API instead of just just limited to using it inside of SQL queries.

@holdenk
Copy link
Contributor

holdenk commented Aug 3, 2016

Does @davies perhaps have bandwith to look at this? (Also maybe @zjffdu consider merging in master? In the spark-pr dashboard this is shown as unable to merge against master even though github shows no conflicts).

* @param className
* @param returnType
*/
def registerJava(name: String, className: String, returnType: DataType): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we want to expose this API for general use - maybe make it private so that it can only be called from Python? And maybe update the scaladoc to something like "Register a Java UDF class using reflection - for use from Python".

Copy link
Member

@viirya viirya Oct 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+ 1 for update the scaladoc. Register a Java UDF class doesn't exactly convey the meaning of this function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, there are no document for the parameters. Should be better to add doc for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@GregBowyer
Copy link

Where do we stand on this, I just reapplied this patch to a spark 2.1-xxx build to get the same behaviour.

@zjffdu
Copy link
Contributor Author

zjffdu commented Sep 22, 2016

Rebase the PR, @davies @JoshRosen Could you help to review it ? Thanks

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65797 has finished for PR 9766 at commit dc31d78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

marmbrus commented Oct 7, 2016

+1 to this functionality, but also to the request to add more tests and documentation. It would also to be good to comment on the idea of using SQL as a more general way to implement this.

@SparkQA
Copy link

SparkQA commented Oct 9, 2016

Test build #66599 has finished for PR 9766 at commit 93d565c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

"""Register a java UDF so it can be used in SQL statements.

In addition to a name and the function itself, the return type can be optionally specified.
When the return type is not given it default to a string and conversion will automatically
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this conversion happen? Are we sure that it works (given there are no tests that I see).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's my mistake of copying these comments, actually there's no conversion.

def registerJava(name: String, className: String, returnType: DataType): Unit = {

try {
// scalastyle:off classforname
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This style rule is here to prevent misuse. Is there a reason we aren't using our utility functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this JVM specific? What is this being used for? Is there another way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66791 has finished for PR 9766 at commit 9de8c0e.

  • This patch fails RAT tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66793 has finished for PR 9766 at commit dc6d5f9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66794 has finished for PR 9766 at commit 45a9b7a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66796 has finished for PR 9766 at commit e9832f6.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66801 has finished for PR 9766 at commit d481821.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@marmbrus marmbrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments. I think this is going to be a popular feature!

"""Register a java UDF so it can be used in SQL statements.

In addition to a name and the function itself, the return type can be optionally specified.
When the return type is not given it would infer the returnType via reflection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: its a little odd to mix return type with returnType. Perhaps, "When the return type is not specified we attempt to infer it using reflection"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

/**
* It is used for register Java UDF from PySpark
*/
public class JavaStringLength implements UDF1<String, Integer> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be moved to src/test? It would be better to not distribute it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -414,6 +418,84 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
//////////////////////////////////////////////////////////////////////////////////////////////

/**
Copy link
Contributor

@marmbrus marmbrus Oct 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to turn style back on here since most of this function is not auto generated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can turn it on, but it would make the function less readable, especially for the following statements where it beyond line length limitation.

case 14 => register(name, udf.asInstanceOf[UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)

* @param returnDataType return type of udf. If it is null, spark would try to infer
* via reflection.
*/
def registerJava(name: String, className: String, returnDataType: DataType): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make this non-public? I believe we do this in other cases for code only called from python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

var returnType = returnDataType
if (returnType == null) {
if (udfReturnType.isInstanceOf[Class[_]]) {
returnType = udfReturnType.asInstanceOf[Class[_]].getCanonicalName match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use JavaTypeInference here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hint, fixed

@SparkQA
Copy link

SparkQA commented Oct 13, 2016

Test build #66866 has finished for PR 9766 at commit 18fa6e3.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 13, 2016

Test build #66868 has finished for PR 9766 at commit 00f65cd.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 13, 2016

Test build #66870 has finished for PR 9766 at commit 8171b85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

Thanks, merging to master.

@asfgit asfgit closed this in f00df40 Oct 14, 2016
@Orhideous
Copy link

Many thanks for this feature! I've been using this code in production with Spark 1.6 (as patch) and never seen any issues with stability. 😄

robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
Currently pyspark can only call the builtin java UDF, but can not call custom java UDF. It would be better to allow that. 2 benefits:
* Leverage the power of rich third party java library
* Improve the performance. Because if we use python UDF, python daemons will be started on worker which will affect the performance.

Author: Jeff Zhang <zjffdu@apache.org>

Closes apache#9766 from zjffdu/SPARK-11775.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
Currently pyspark can only call the builtin java UDF, but can not call custom java UDF. It would be better to allow that. 2 benefits:
* Leverage the power of rich third party java library
* Improve the performance. Because if we use python UDF, python daemons will be started on worker which will affect the performance.

Author: Jeff Zhang <zjffdu@apache.org>

Closes apache#9766 from zjffdu/SPARK-11775.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants