-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35384][SQL] Improve performance for InvokeLike.invoke #32527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
maropu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine otherwise.
| var i = 0 | ||
| val len = arguments.length | ||
| while (i < len) { | ||
| val e = arguments(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks we don't need this intermediate val?
evaluatedArgs(i) = arguments(i).eval(input).asInstanceOf[Object]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea let me remove it
| evaluatedArgs(i) = e.eval(input).asInstanceOf[Object] | ||
| i += 1 | ||
| } | ||
| if (needNullCheck && evaluatedArgs.exists(_ == null)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: my IDE suggests .exists(_ == null) -> .contains(null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the exists part is not related to this PR but I'm happy to change it :)
| var i = 0 | ||
| val len = arguments.length | ||
| while (i < len) { | ||
| evaluatedArgs(i) = arguments(i).eval(input).asInstanceOf[Object] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need to keep evaluatedArgs as a (lazy) val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean just use val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't we evaluate arguments for each time invoke is called? Why not just having val evaluatedArgs: Array[Object] = new Array[Object](arguments.length) in invoke?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it aims to reuse Array[Object] itself and only changes the values of array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea even though we evaluate arguments for each invoke call we can reuse the same array to store the results of evaluation. I guess it's better than allocating a new Array[Object] for each input row.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. The improvement looks nice!
|
Test build #138475 has finished for PR 32527 at commit
|
|
Thank you, @sunchao and all! Merged to master for Apache Spark 3.2.0. |
| val ret = try { | ||
| method.invoke(obj, args: _*) | ||
| method.invoke(obj, evaluatedArgs: _*) | ||
| } catch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also improve the last piece?
val boxedClass = ScalaReflection.typeBoxedJavaMapping.get(dataType)
if (boxedClass.isDefined) {
boxedClass.get.cast(ret)
} else {
ret
}
We can create a function for it
private lazy val boxing: Any => Any = ScalaReflection.typeBoxedJavaMapping.get(dataType).map(_.cast(_)).getOrElse(identity)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do the similar thing in Invoke.eval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea let me try it. In the profiling after this PR, HashMap.get takes 7.82% from the entire invoke call so it seems worthwhile to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we can do the similar thing in Invoke.eval though since obj in obj.getClass.getMethod(functionName, argClasses: _*) is different for each call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Another idea: obj from InternalRow are always of the same class, we can avoid this
@transient lazy val method = {
val cls = targetObject.dataType match {
case ObjectType(cls) => cls
case StringType => classOf[UTF8String]
case _: DecimalType => classOf[Decimal]
...
}
findMethod(cls, encodedFunctionName, argClasses)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'm not sure. Looking at usages of Invoke, it seems targetObject.dataType is usually ObjectType (for instance, in ScalarFunction we wrap the UDF into a Literal with ObjectType), so curious how useful this would be and when we'd use StringType/DecimalType for the targetObject.
Looking at the profiling result for Invoke.eval, it is now dominated by InvokeLike.invoke:
Although this is somewhat unrelated to the above as V2FunctionBenchmark (and ScalarFunction) uses ObjectType for Invoke so it's already handled by the current code:
@transient lazy val method = targetObject.dataType match {
case ObjectType(cls) =>
Some(findMethod(cls, encodedFunctionName, argClasses))
case _ => None
}we may need new benchmarks if we decide to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, for UDF, it's just an extra method.isDefine check, and probably not a big issue.
|
Test build #138480 has finished for PR 32527 at commit
|

What changes were proposed in this pull request?
Change
mapinInvokeLike.invoketo a while loop to improve performance, following Spark style guide.Why are the changes needed?
InvokeLike.invoke, which is used in non-codegen path forInvokeandStaticInvoke, currently usesmapto evaluate arguments:which is pretty expensive if the method itself is trivial. We can change it to a plain while loop.
Benchmark results show this can improve as much as 3x from
V2FunctionBenchmark:Before
After
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests.