Skip to content

[SPARK-8632] [SQL] [PYSPARK] Poor Python UDF performance because of R… #8662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

justinuang
Copy link

…DD caching

  • I wanted to reuse most of the logic from PythonRDD, so I pulled out
    two methods, writeHeaderToStream and readPythonProcessSocket
  • The worker.py now has a switch where it reads an int that either tells
    it to go into normal pyspark RDD mode, which is meant for a streaming
    two thread workflow, and pyspark UDF mode, which is meant to be called
    synchronously

…DD caching

- I wanted to reuse most of the logic from PythonRDD, so I pulled out
  two methods, writeHeaderToStream and readPythonProcessSocket
- The worker.py now has a switch where it reads an int that either tells
  it to go into normal pyspark RDD mode, which is meant for a streaming
  two thread workflow, and pyspark UDF mode, which is meant to be called
  synchronously
@justinuang
Copy link
Author

@davies @JoshRosen @rxin

@rxin
Copy link
Contributor

rxin commented Sep 9, 2015

Jenkins, ok to test.

@justinuang
Copy link
Author

Should the build have started by now?

@rxin
Copy link
Contributor

rxin commented Sep 9, 2015

Yes - Mr. Jenkins doesn't like me anymore.

@SparkQA
Copy link

SparkQA commented Sep 9, 2015

Test build #1731 has finished for PR 8662 at commit af5254b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PySparkMode(object):

@davies
Copy link
Contributor

davies commented Sep 10, 2015

@justinuang Thanks for working on this, do you have some number about the performance improvements on this?

@justinuang
Copy link
Author

Hey davies, I don't have any numbers. Are there any benchmarks that we can just rerun?

@davies
Copy link
Contributor

davies commented Sep 10, 2015

Unfortunately, we haven't do any benchmark for Python UDF yet, can you do one or two simple case?

@justinuang
Copy link
Author

Is there an example of another benchmark? I'm not sure where they're stored for python

@davies
Copy link
Contributor

davies commented Sep 10, 2015

I think something like this could be enough:

from pyspark.sql.functions import udf
tofloat = udf(lambda x: float(x), DoubleType)
df = sqlContext.range(1<<25)
dff = df.select(tofloat(df.id).alias('f'))
start = time.time()
dff.filter(dff.f > 0).count() # make sure the Python UDF is evaluated
used = time.time() - start

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #1737 has finished for PR 8662 at commit af5254b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PySparkMode(object):

@justinuang
Copy link
Author

Sorry for the delay, here is the code I ran and here are the results

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import time
mult = udf(lambda x: 2 * x, IntegerType())

for i in range(0,6):
    df = sqlContext.range(1000000).withColumnRenamed("id", "f")
    for j in range(i):
        df = df.select(mult(df.f).alias('f'))

    start = time.time()
    df.count() # make sure the Python UDF is evaluated
    used = time.time() - start
    print "Number of udfs: {} - {}".format(i, used)

The results are as expected. The python overhead is about 1.5 seconds, but you can see how the time becomes exponential for without fix, since the cost of calculating upstream twice includes expensive python operations themselves.

With fix
Number of udfs: 0 - 0.091050863266
Number of udfs: 1 - 1.72215199471
Number of udfs: 2 - 3.32698297501
Number of udfs: 3 - 5.64863801003
Number of udfs: 4 - 7.06328701973
Number of udfs: 5 - 9.22025489807

Without fix
Number of udfs: 0 - 1.00539588928
Number of udfs: 1 - 3.12671899796
Number of udfs: 2 - 5.91188406944
Number of udfs: 3 - 11.124516964
Number of udfs: 4 - 24.3277280331
Number of udfs: 5 - 47.621573925

@punya
Copy link
Contributor

punya commented Sep 14, 2015

👍

@davies
Copy link
Contributor

davies commented Sep 14, 2015

@justinuang Thanks for numbers, they looks interesting. The result of no udfs looks strange, could you run each query twice and use the best (or second) result?

@justinuang
Copy link
Author

Looks like your intuition was right. The second time it's slightly faster, so I ran the loop twice and took the 2nd's numbers

Here are the updated numbers

With fix
Number of udfs: 0 - 0.0953350067139
Number of udfs: 1 - 1.73201990128
Number of udfs: 2 - 3.41883206367
Number of udfs: 3 - 5.24572992325
Number of udfs: 4 - 6.83000802994
Number of udfs: 5 - 8.59465384483

Without fix
Number of udfs: 0 - 0.0891687870026
Number of udfs: 1 - 1.53674888611
Number of udfs: 2 - 4.44895505905
Number of udfs: 3 - 10.0561971664
Number of udfs: 4 - 21.5314221382
Number of udfs: 5 - 43.887141943 

It does look like there's a tiny performance drop for 1 udf. My guess is that it's slightly slower because the initial approach was slightly cheating with CPU time. It had 3 threads that could do serialization and deserialation work at the same time. However, this is breaking with the RDD abstraction that each partition should only get one thread each to do CPU work.

@justinuang
Copy link
Author

Jenkins, retest this please

@justinuang
Copy link
Author

@rxin or @davies why is this automatically not retriggering when i push a new commit? Also, looks like the "retest this please" only works with committers.

@SparkQA
Copy link

SparkQA commented Sep 16, 2015

Test build #1765 has finished for PR 8662 at commit 7fe4a0e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class _PySparkMode(object):

@justinuang
Copy link
Author

@davies how do I have a private class in python?

In addition, is it possible that the failing unit test is flaky? I ran

./run-tests --python-executables=python

and it succeeds locally.

@davies
Copy link
Contributor

davies commented Sep 16, 2015

@justinuang This patch works pretty well on multiple UDFs, but I have two concerns before review the details: 1) it have some overhead for each batch, cause some regression for single UDF, 2) lots of changes in PythonRDD and worker.py, Python UDFs work differently than other PythonRDD, increase the complicity (and potential bugs).

There is another approach as we discussed before, using better cache for upstream RDD, could be called CacheOnceRDD, which appends all the rows into an array when compute() is called for the first time, then pull and remove the rows when compute() is called second time. This CachedOnceRDD should work similar to UnsafeExternalSorter (spilling to disk if no enough memory). I think that this approach should have better performance than current approach without change PythonRDD (which is already very complicated). I really want to try this out, but have not got some time to work on it.

@rxin
Copy link
Contributor

rxin commented Sep 17, 2015

Hye @davies - I was not involved with Python UDFs earlier, but why does calling udfs require caching? Isn't it really bad if the partition is large?

It doesn't make a lot of sense to me that we would need to materialize the entire partition of data for calling UDFs.

@davies
Copy link
Contributor

davies commented Sep 17, 2015

After offline discussion with @rxin @marmbrus , we can try another approach by passing the entire row into python worker (it increase the IO between JVM and python, but not blocked). I can try it tonight.

@justinuang
Copy link
Author

Hey davies, I think the performance regression for a single UDF may be because there were multiple threads per task that could potentially be taking up CPU time. I highly doubt that the actual IO using loopback is actually add much time, compared to the time of deserializing and serializing the individual items in the row.

The other approach of passing the entire row can potentially be okay, and it doesn't add a lot of changes to PythonRDD and Python UDFs, but I'm afraid that the cost of serializing the entire row can be prohibitive. After all, isn't serialization from in-memory jvm types to the pickled representations the most expensive part? What if I have a giant row of 100 columns, and I only want to do a UDF on one column? Do I need to serialize the entire row to pickle?

@SparkQA
Copy link

SparkQA commented Sep 17, 2015

Test build #1766 has finished for PR 8662 at commit 7fe4a0e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class _PySparkMode(object):

@davies
Copy link
Contributor

davies commented Sep 17, 2015

Here are some benchmark with different approaches:

Without fix:
Number of udfs: 0 - 0.142887830734
Number of udfs: 1 - 0.948309898376
Number of udfs: 2 - 2.079007864
Number of udfs: 3 - 4.02105379105
Number of udfs: 4 - 8.20960092545
Number of udfs: 5 - 17.1744220257

This PR (synchonized batch):
Number of udfs: 0 - 0.146151065826
Number of udfs: 1 - 1.73253297806
Number of udfs: 2 - 2.73584198952
Number of udfs: 3 - 3.70912384987
Number of udfs: 4 - 5.21050810814
Number of udfs: 5 - 6.24335598946

Pass row around:
Number of udfs: 0 - 0.150754928589
Number of udfs: 1 - 2.33548307419
Number of udfs: 2 - 3.16792798042
Number of udfs: 3 - 4.92886400223
Number of udfs: 4 - 5.86024093628
Number of udfs: 5 - 6.71114897728

Old RDD cache
Number of udfs: 0 - 0.187371969223
Number of udfs: 1 - 1.12094593048
Number of udfs: 2 - 1.82404088974
Number of udfs: 3 - 3.20661616325
Number of udfs: 4 - 3.62113809586
Number of udfs: 5 - 4.25435996056

Using CachedOnceRDD
Number of udfs: 0 - 0.143986940384
Number of udfs: 1 - 1.10472989082
Number of udfs: 2 - 1.96430683136
Number of udfs: 3 - 2.14014697075
Number of udfs: 4 - 2.37295603752
Number of udfs: 5 - 2.8883831501

It seems that passing row into Python is slower than current approach (will much slower with wider table).

Using cache is faster than current approach, because of asynchronous pipeline, CachedOnceRDD is faster than RDD cache, also need fewer memory.

The CachedOnceRDD looks like this:

class CachedOnceRDD(prev: RDD[InternalRow]) extends RDD[InternalRow](prev) {
  override val partitioner = firstParent[InternalRow].partitioner
  override def getPartitions: Array[Partition] = firstParent[InternalRow].partitions

  var visits:Int = 0
  var owner:Int = 0
  var prevIter:Iterator[InternalRow] = null
  var buffer: java.util.Queue[InternalRow] = null

  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    var myid: Int = 0
    synchronized {
      visits += 1
      myid = visits
      if (buffer == null) {
        prevIter = prev.iterator(split, context)
        buffer = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]
        owner = myid
      }
    }
    new Iterator[InternalRow] {
      var row: InternalRow = null
      override def hasNext: Boolean = {
        if (row == null) {
          if (owner == myid) {
            buffer.synchronized {
              if (prevIter != null) {
                val r = prevIter.hasNext
                if (r) {
                  row = prevIter.next().copy()
                  buffer.offer(row)
                } else {
                  prevIter = null
                }
              }
            }
          } else {
            if (!buffer.isEmpty) {
              row = buffer.poll()
            } else {
              buffer.synchronized {
                if (!buffer.isEmpty) {
                  row = buffer.poll()
                } else if (prevIter == null) {
                } else {
                  owner = myid
                  hasNext
                }
              }
            }
          }
        }
        row != null
      }

      override def next: InternalRow = {
        val r = row
        row = null
        r
      }
    }
  }
}

And we need to change this

-    val childResults = child.execute().map(_.copy())
+    val childResults = new CachedOnceRDD(child.execute())

cc @rxin @marmbrus

@rxin
Copy link
Contributor

rxin commented Sep 17, 2015

If I'm reading it correctly, this is not "appends all the rows into an array when compute() is called for the first time, then pull and remove the rows when compute() is called second time" ?

@rxin
Copy link
Contributor

rxin commented Sep 17, 2015

Actually never mind - it does. This seems a lot more complicated still - we would need to handle spilling, and then in that case it would strictly be slower due to the unnecessary spill.

@rxin
Copy link
Contributor

rxin commented Sep 17, 2015

OK I finally understood what's going on here. If I understand your intention correctly, you are assuming:

  1. The 1st read goes into Python
  2. The 2nd read can keep up and won't lag behind too much of the 1st read

So the memory consumption is somewhat bounded.

If you are doing this, can't you simplify this a lot by removing all the synchronization, the owner thing, and the RDD, and just in the mapPartitions call:

  1. consume the input iterator, add it to two queues
  2. python reading from one queue
  3. the 2nd read (zip) read from the 2nd queue

You mentioned earlier there might be a chance to deadlock. I don't understand why it would deadlock. But even if it could deadlock with the above approach, you can change it slightly to you can achieve an identical thing as what you have by doing the following in the mapPartitions call:

  1. create an iterator wrapper that adds each consumed record to a blocking queue
  2. for each output record from python, drain an element from the queue

@rxin
Copy link
Contributor

rxin commented Sep 17, 2015

BTW with all of these - I'm starting to think we should just do a quick fix for 1.5.1, and then for 1.6 we would need to rewrite a lot of these in order to use local iterators anyway.

For local iterators, we wouldn't reuse the existing PythonRDD, so the goal to keep only one code path is probably moot.

@justinuang
Copy link
Author

The solution with the iterator wrapper was my first approach that I prototyped (http://apache-spark-developers-list.1001551.n3.nabble.com/Python-UDF-performance-at-large-scale-td12843.html). It's dangerous because there is buffering at many levels, in which case we can run into a deadlock situation.

- NEW: the ForkingIterator LinkedBlockingDeque
- batching the rows before pickling them
- os buffers on both sides
- pyspark.serializers.BatchedSerializer

We can avoid deadlock by being very disciplined. For example, we can have the ForkingIterator instead always do a check of whether the LinkedBlockingDeque is full and if so:

Java
    - flush the java pickling buffer
    - send a flush command to the python process
    - os.flush the java side

Python
    - flush BatchedSerializer
    - os.flush()

I'm not sure that this udf performance regression for one UDF is going to hit many people. For one, most upstreams are not a range() call, which doesn't have to go back to disk and deserialize. My personal opinion is that the blocking performance shouldn't be the reason that we reject this approach, but because it adds complexity.

If we want a quick fix that is safe, I would be in favor of passing the row, which indeed is slower, but better than deadlocking or calculating upstream twice. It's just that the current system is unacceptable.

Maybe we can also consider going with a complete architecture shift that goes with a batching system, but uses thrift to serialize the scala types to a language agnostic format, and also handle the blocking RPC. Then we can have PySpark and SparkR using the same simple UDF architecture. The main drawback is that I'm not sure how we're going to support broadcast variables or aggregators, but should those even be supported with UDFs?

EDIT: On second thought, the cost of serializing massive rows is prohibitive (we have tables with several hundred columns where I work). It feels like the approach in this PR, a complete rearchitecture with thrift to clean things up, or some clever and well-testing flushing system are the viable options right now

@justinuang
Copy link
Author

@rxin what do you mean by local iterators =) I feel like i'm missing some context that you guys have

@rxin
Copy link
Contributor

rxin commented Sep 17, 2015

This is the ticket: https://issues.apache.org/jira/browse/SPARK-9983

@davies
Copy link
Contributor

davies commented Sep 17, 2015

@rxin The reason CachedOnceRDD looks a little bit complicated than expected is that the order of two caller (zip and writer thread) is undefined (they are in two threads), usually the zip will lag behind writer thread, but zip could call compute() before writer thread. This could avoid the deadlock without do flushing in PythonRDD pipeline.

The buffered rows are bounded to the number of rows in PythonRDD, we could don't do spilling here (there is still a chance to OOM, but small).

For 1.5.1, the goals are:

  1. compute the upstream once
  2. no deadlock
  3. small risk (fewer changes in PythonRDD)
  4. no much performance regression

Which one should we pick for 1.5.1?

@justinuang
Copy link
Author

I'm not sure there is a solution that satisfies all the requirements. I can say that this approach addresses 1,2,4 by design. EDIT: If we can't find a solution that satisfies 1,2,3, and 4, then I'm okay with us not merging something for 1.5.1. I do think that we should merge something that address 1,2, and 4 into 1.6.0

Would you guys support a 1.6.0 UDF implementation that uses thrift for the RPC and serialization? In general, I think the custom-rolled socket, serialization, and cleanup approach as pretty scary. They're already solved problems, and then we can support multiple language bindings at the DataFrame level, where I think it's a lot easier to implement. We could even support broadcast variables by allowing language bindings to store bytes in the UDF that will be passed back to them. I don't think we need to support accumulators right?

@davies
Copy link
Contributor

davies commented Sep 19, 2015

I took the idea from @rxin to simplify CachedOnceRDD, created #8833 , @justinuang please help to review it.

@rxin
Copy link
Contributor

rxin commented Sep 20, 2015

I think this should work? #8835

@davies
Copy link
Contributor

davies commented Sep 29, 2015

@justinuang #8835 is already merged, can you close this PR? thanks!

@justinuang
Copy link
Author

Thanks for the reminder!

@justinuang justinuang closed this Sep 29, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants