-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-8632] [SQL] [PYSPARK] Poor Python UDF performance because of R… #8662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…DD caching - I wanted to reuse most of the logic from PythonRDD, so I pulled out two methods, writeHeaderToStream and readPythonProcessSocket - The worker.py now has a switch where it reads an int that either tells it to go into normal pyspark RDD mode, which is meant for a streaming two thread workflow, and pyspark UDF mode, which is meant to be called synchronously
Jenkins, ok to test. |
Should the build have started by now? |
Yes - Mr. Jenkins doesn't like me anymore. |
Test build #1731 has finished for PR 8662 at commit
|
@justinuang Thanks for working on this, do you have some number about the performance improvements on this? |
Hey davies, I don't have any numbers. Are there any benchmarks that we can just rerun? |
Unfortunately, we haven't do any benchmark for Python UDF yet, can you do one or two simple case? |
Is there an example of another benchmark? I'm not sure where they're stored for python |
I think something like this could be enough:
|
Test build #1737 has finished for PR 8662 at commit
|
Sorry for the delay, here is the code I ran and here are the results
The results are as expected. The python overhead is about 1.5 seconds, but you can see how the time becomes exponential for without fix, since the cost of calculating upstream twice includes expensive python operations themselves.
|
👍 |
@justinuang Thanks for numbers, they looks interesting. The result of no udfs looks strange, could you run each query twice and use the best (or second) result? |
Looks like your intuition was right. The second time it's slightly faster, so I ran the loop twice and took the 2nd's numbers Here are the updated numbers
It does look like there's a tiny performance drop for 1 udf. My guess is that it's slightly slower because the initial approach was slightly cheating with CPU time. It had 3 threads that could do serialization and deserialation work at the same time. However, this is breaking with the RDD abstraction that each partition should only get one thread each to do CPU work. |
Jenkins, retest this please |
Test build #1765 has finished for PR 8662 at commit
|
@davies how do I have a private class in python? In addition, is it possible that the failing unit test is flaky? I ran
and it succeeds locally. |
@justinuang This patch works pretty well on multiple UDFs, but I have two concerns before review the details: 1) it have some overhead for each batch, cause some regression for single UDF, 2) lots of changes in PythonRDD and worker.py, Python UDFs work differently than other PythonRDD, increase the complicity (and potential bugs). There is another approach as we discussed before, using better cache for upstream RDD, could be called CacheOnceRDD, which appends all the rows into an array when compute() is called for the first time, then pull and remove the rows when compute() is called second time. This CachedOnceRDD should work similar to UnsafeExternalSorter (spilling to disk if no enough memory). I think that this approach should have better performance than current approach without change PythonRDD (which is already very complicated). I really want to try this out, but have not got some time to work on it. |
Hye @davies - I was not involved with Python UDFs earlier, but why does calling udfs require caching? Isn't it really bad if the partition is large? It doesn't make a lot of sense to me that we would need to materialize the entire partition of data for calling UDFs. |
Hey davies, I think the performance regression for a single UDF may be because there were multiple threads per task that could potentially be taking up CPU time. I highly doubt that the actual IO using loopback is actually add much time, compared to the time of deserializing and serializing the individual items in the row. The other approach of passing the entire row can potentially be okay, and it doesn't add a lot of changes to PythonRDD and Python UDFs, but I'm afraid that the cost of serializing the entire row can be prohibitive. After all, isn't serialization from in-memory jvm types to the pickled representations the most expensive part? What if I have a giant row of 100 columns, and I only want to do a UDF on one column? Do I need to serialize the entire row to pickle? |
Test build #1766 has finished for PR 8662 at commit
|
Here are some benchmark with different approaches:
It seems that passing row into Python is slower than current approach (will much slower with wider table). Using cache is faster than current approach, because of asynchronous pipeline, CachedOnceRDD is faster than RDD cache, also need fewer memory. The CachedOnceRDD looks like this:
And we need to change this
|
If I'm reading it correctly, this is not "appends all the rows into an array when compute() is called for the first time, then pull and remove the rows when compute() is called second time" ? |
Actually never mind - it does. This seems a lot more complicated still - we would need to handle spilling, and then in that case it would strictly be slower due to the unnecessary spill. |
OK I finally understood what's going on here. If I understand your intention correctly, you are assuming:
So the memory consumption is somewhat bounded. If you are doing this, can't you simplify this a lot by removing all the synchronization, the owner thing, and the RDD, and just in the mapPartitions call:
You mentioned earlier there might be a chance to deadlock. I don't understand why it would deadlock. But even if it could deadlock with the above approach, you can change it slightly to you can achieve an identical thing as what you have by doing the following in the mapPartitions call:
|
BTW with all of these - I'm starting to think we should just do a quick fix for 1.5.1, and then for 1.6 we would need to rewrite a lot of these in order to use local iterators anyway. For local iterators, we wouldn't reuse the existing PythonRDD, so the goal to keep only one code path is probably moot. |
The solution with the iterator wrapper was my first approach that I prototyped (http://apache-spark-developers-list.1001551.n3.nabble.com/Python-UDF-performance-at-large-scale-td12843.html). It's dangerous because there is buffering at many levels, in which case we can run into a deadlock situation.
I'm not sure that this udf performance regression for one UDF is going to hit many people. For one, most upstreams are not a range() call, which doesn't have to go back to disk and deserialize. My personal opinion is that the blocking performance shouldn't be the reason that we reject this approach, but because it adds complexity. If we want a quick fix that is safe, I would be in favor of passing the row, which indeed is slower, but better than deadlocking or calculating upstream twice. It's just that the current system is unacceptable. Maybe we can also consider going with a complete architecture shift that goes with a batching system, but uses thrift to serialize the scala types to a language agnostic format, and also handle the blocking RPC. Then we can have PySpark and SparkR using the same simple UDF architecture. The main drawback is that I'm not sure how we're going to support broadcast variables or aggregators, but should those even be supported with UDFs? EDIT: On second thought, the cost of serializing massive rows is prohibitive (we have tables with several hundred columns where I work). It feels like the approach in this PR, a complete rearchitecture with thrift to clean things up, or some clever and well-testing flushing system are the viable options right now |
@rxin what do you mean by local iterators =) I feel like i'm missing some context that you guys have |
This is the ticket: https://issues.apache.org/jira/browse/SPARK-9983 |
@rxin The reason CachedOnceRDD looks a little bit complicated than expected is that the order of two caller (zip and writer thread) is undefined (they are in two threads), usually the zip will lag behind writer thread, but zip could call compute() before writer thread. This could avoid the deadlock without do flushing in PythonRDD pipeline. The buffered rows are bounded to the number of rows in PythonRDD, we could don't do spilling here (there is still a chance to OOM, but small). For 1.5.1, the goals are:
Which one should we pick for 1.5.1? |
I'm not sure there is a solution that satisfies all the requirements. I can say that this approach addresses 1,2,4 by design. EDIT: If we can't find a solution that satisfies 1,2,3, and 4, then I'm okay with us not merging something for 1.5.1. I do think that we should merge something that address 1,2, and 4 into 1.6.0 Would you guys support a 1.6.0 UDF implementation that uses thrift for the RPC and serialization? In general, I think the custom-rolled socket, serialization, and cleanup approach as pretty scary. They're already solved problems, and then we can support multiple language bindings at the DataFrame level, where I think it's a lot easier to implement. We could even support broadcast variables by allowing language bindings to store bytes in the UDF that will be passed back to them. I don't think we need to support accumulators right? |
I took the idea from @rxin to simplify CachedOnceRDD, created #8833 , @justinuang please help to review it. |
I think this should work? #8835 |
@justinuang #8835 is already merged, can you close this PR? thanks! |
Thanks for the reminder! |
…DD caching
two methods, writeHeaderToStream and readPythonProcessSocket
it to go into normal pyspark RDD mode, which is meant for a streaming
two thread workflow, and pyspark UDF mode, which is meant to be called
synchronously