-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-10714][SPARK-8632][SPARK-10685][SQL] Refactor Python UDF handling #8835
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
62fdea2
to
8bfe6c4
Compare
cc @davies |
Test build #42712 has finished for PR 8835 at commit
|
Could you try to re-use the code in Python UDF? |
@davies I've updated BatchPythonEvaluation too. |
@@ -342,51 +348,57 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: | |||
override def canProcessSafeRows: Boolean = true | |||
|
|||
protected override def doExecute(): RDD[InternalRow] = { | |||
val childResults = child.execute().map(_.copy()) | |||
val inputRDD = child.execute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep the copy()
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
Test build #42719 has finished for PR 8835 at commit
|
Test build #42724 has finished for PR 8835 at commit
|
* | ||
* For each row we send to Python, we also put it in a queue. For each output row from Python, | ||
* we drain the queue to find the original input row. Note that if the Python process is way too | ||
* slow, this could lead to the queue growing unbounded and eventually run out of memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we mitigate this by using a LinkedBlockingDeque to have the producer-side block on inserts once the queue grows to a certain size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per discussion offline, the only scenario where the queue can grow really large is when the Python buffer size has been configured to be very large and the UDF result rows are very small. As a result, I think that this comment should be expanded / clarified, but this can take place in a followup PR.
Based on some offline discussion / debate, we've decided to merge this patch into both master and branch-1.5. I'm going to merge this now. |
…ndling This patch refactors Python UDF handling: 1. Extract the per-partition Python UDF calling logic from PythonRDD into a PythonRunner. PythonRunner itself expects iterator as input/output, and thus has no dependency on RDD. This way, we can use PythonRunner directly in a mapPartitions call, or in the future in an environment without RDDs. 2. Use PythonRunner in Spark SQL's BatchPythonEvaluation. 3. Updated BatchPythonEvaluation to only use its input once, rather than twice. This should fix Python UDF performance regression in Spark 1.5. There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small. This basically implements the approach in #8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution. Author: Reynold Xin <rxin@databricks.com> Closes #8835 from rxin/python-iter-refactor. (cherry picked from commit a96ba40) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
This patch refactors Python UDF handling:
There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small.
This basically implements the approach in #8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution.