Skip to content

[SPARK-10714][SPARK-8632][SPARK-10685][SQL] Refactor Python UDF handling #8835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Sep 19, 2015

This patch refactors Python UDF handling:

  1. Extract the per-partition Python UDF calling logic from PythonRDD into a PythonRunner. PythonRunner itself expects iterator as input/output, and thus has no dependency on RDD. This way, we can use PythonRunner directly in a mapPartitions call, or in the future in an environment without RDDs.
  2. Use PythonRunner in Spark SQL's BatchPythonEvaluation.
  3. Updated BatchPythonEvaluation to only use its input once, rather than twice. This should fix Python UDF performance regression in Spark 1.5.

There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small.

This basically implements the approach in #8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution.

@rxin rxin force-pushed the python-iter-refactor branch from 62fdea2 to 8bfe6c4 Compare September 19, 2015 06:53
@rxin rxin changed the title [WIP] Refactor PythonRDD to decouple iterator computation from PythonRDD. [SPARK-10714] Refactor PythonRDD to decouple iterator computation from PythonRDD. Sep 19, 2015
@rxin
Copy link
Contributor Author

rxin commented Sep 19, 2015

cc @davies

@SparkQA
Copy link

SparkQA commented Sep 19, 2015

Test build #42712 has finished for PR 8835 at commit 8bfe6c4.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class WriterThread(

@davies
Copy link
Contributor

davies commented Sep 19, 2015

Could you try to re-use the code in Python UDF?

@rxin
Copy link
Contributor Author

rxin commented Sep 20, 2015

@davies I've updated BatchPythonEvaluation too.

@rxin rxin changed the title [SPARK-10714] Refactor PythonRDD to decouple iterator computation from PythonRDD. [SPARK-10714][SPARK-8632][SPARK-10685] Refactor Python UDF execution Sep 20, 2015
@rxin rxin changed the title [SPARK-10714][SPARK-8632][SPARK-10685] Refactor Python UDF execution [SPARK-10714][SPARK-8632][SPARK-10685][SQL] Refactor Python UDF handling Sep 20, 2015
@@ -342,51 +348,57 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
override def canProcessSafeRows: Boolean = true

protected override def doExecute(): RDD[InternalRow] = {
val childResults = child.execute().map(_.copy())
val inputRDD = child.execute()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep the copy() here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

@SparkQA
Copy link

SparkQA commented Sep 20, 2015

Test build #42719 has finished for PR 8835 at commit 8d3c495.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class WriterThread(

@SparkQA
Copy link

SparkQA commented Sep 20, 2015

Test build #42724 has finished for PR 8835 at commit 5e55bf6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class WriterThread(

*
* For each row we send to Python, we also put it in a queue. For each output row from Python,
* we drain the queue to find the original input row. Note that if the Python process is way too
* slow, this could lead to the queue growing unbounded and eventually run out of memory.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we mitigate this by using a LinkedBlockingDeque to have the producer-side block on inserts once the queue grows to a certain size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion offline, the only scenario where the queue can grow really large is when the Python buffer size has been configured to be very large and the UDF result rows are very small. As a result, I think that this comment should be expanded / clarified, but this can take place in a followup PR.

@JoshRosen
Copy link
Contributor

Based on some offline discussion / debate, we've decided to merge this patch into both master and branch-1.5. I'm going to merge this now.

@asfgit asfgit closed this in a96ba40 Sep 22, 2015
asfgit pushed a commit that referenced this pull request Sep 22, 2015
…ndling

This patch refactors Python UDF handling:

1. Extract the per-partition Python UDF calling logic from PythonRDD into a PythonRunner. PythonRunner itself expects iterator as input/output, and thus has no dependency on RDD. This way, we can use PythonRunner directly in a mapPartitions call, or in the future in an environment without RDDs.
2. Use PythonRunner in Spark SQL's BatchPythonEvaluation.
3. Updated BatchPythonEvaluation to only use its input once, rather than twice. This should fix Python UDF performance regression in Spark 1.5.

There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small.

This basically implements the approach in #8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution.

Author: Reynold Xin <rxin@databricks.com>

Closes #8835 from rxin/python-iter-refactor.

(cherry picked from commit a96ba40)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants