Skip to content

[SPARK-23030][SQL][PYTHON] Use Arrow stream format for creating from and collecting Pandas DataFrames #21546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

BryanCutler
Copy link
Member

@BryanCutler BryanCutler commented Jun 12, 2018

What changes were proposed in this pull request?

This changes the calls of toPandas() and createDataFrame() to use the Arrow stream format, when Arrow is enabled. Previously, Arrow data was written to byte arrays where each chunk is an output of the Arrow file format. This was mainly due to constraints at the time, and caused some overhead by writing the schema/footer on each chunk of data and then having to read multiple Arrow file inputs and concat them together.

Using the Arrow stream format has improved these by increasing performance, lower memory overhead for the average case, and simplified the code. Here are the details of this change:

toPandas()

Before:
Spark internal rows are converted to Arrow file format, each group of records is a complete Arrow file which contains the schema and other metadata. Next a collect is done and an Array of Arrow files is the result. After that each Arrow file is sent to Python driver which then loads each file and concats them to a single Arrow DataFrame.

After:
Spark internal rows are converted to ArrowRecordBatches directly, which is the simplest Arrow component for IPC data transfers. The driver JVM then immediately starts serving data to Python as an Arrow stream, sending the schema first. It then starts a Spark job with a custom handler that sends Arrow RecordBatches to Python. Partitions arriving in order are sent immediately, and out-of-order partitions are buffered until the ones that precede it come in. This improves performance, simplifies memory usage on executors, and improves the average memory usage on the JVM driver. Since the order of partitions must be preserved, the worst case is that the first partition will be the last to arrive all data must be buffered in memory until then. This case is no worse that before when doing a full collect.

createDataFrame()

Before:
A Pandas DataFrame is split into parts and each part is made into an Arrow file. Then each file is prefixed by the buffer size and written to a temp file. The temp file is read and each Arrow file is parallelized as a byte array.

After:
A Pandas DataFrame is split into parts, then an Arrow stream is written to a temp file where each part is an ArrowRecordBatch. The temp file is read as a stream and the Arrow messages are examined. If the message is an ArrowRecordBatch, the data is saved as a byte array. After reading the file, each ArrowRecordBatch is parallelized as a byte array. This has slightly more processing than before because we must look each Arrow message to extract the record batches, but performance ends up a litle better. It is cleaner in the sense that IPC from Python to JVM is done over a single Arrow stream.

How was this patch tested?

Added new unit tests for the additions to ArrowConverters in Scala, existing tests for Python.

Performance Tests - toPandas

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute toPandas() and took the average best time of 5 runs/5 loops each.

Test code

df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand())
for i in range(5):
	start = time.time()
	_ = df.toPandas()
	elapsed = time.time() - start
Current Master This PR
5.803557 5.16207
5.409119 5.133671
5.493509 5.147513
5.433107 5.105243
5.488757 5.018685
Avg Master Avg This PR
5.5256098 5.1134364

Speedup of 1.08060595

Performance Tests - createDataFrame

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute createDataFrame() and get the first record. Took the average best time of 5 runs/5 loops each.

Test code

def run():
	pdf = pd.DataFrame(np.random.rand(10000000, 10))
	spark.createDataFrame(pdf).first()

for i in range(6):
	start = time.time()
	run()
	elapsed = time.time() - start
	gc.collect()
	print("Run %d: %f" % (i, elapsed))
Current Master This PR
6.234608 5.665641
6.32144 5.3475
6.527859 5.370803
6.95089 5.479151
6.235046 5.529167
Avg Master Avg This PR
6.4539686 5.4784524

Speedup of 1.178064192

Memory Improvements

toPandas()

The most significant improvement is reduction of the upper bound space complexity in the JVM driver. Before, the entire dataset was collected in the JVM first before sending it to Python. With this change, as soon as a partition is collected, the result handler immediately sends it to Python, so the upper bound is the size of the largest partition. Also, using the Arrow stream format is more efficient because the schema is written once per stream, followed by record batches. The schema is now only send from driver JVM to Python. Before, multiple Arrow file formats were used that each contained the schema. This duplicated schema was created in the executors, sent to the driver JVM, and then Python where all but the first one received are discarded.

I verified the upper bound limit by running a test that would collect data that would exceed the amount of driver JVM memory available. Using these settings on a standalone cluster:

spark.driver.memory 1g
spark.executor.memory 5g
spark.sql.execution.arrow.enabled true
spark.sql.execution.arrow.fallback.enabled false
spark.sql.execution.arrow.maxRecordsPerBatch 0
spark.driver.maxResultSize 2g

Test code:

from pyspark.sql.functions import rand
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand())
df.toPandas()

This makes total data size of 33554432×8×4 = 1073741824

With the current master, it fails with OOM but passes using this PR.

createDataFrame()

No significant change in memory except that using the stream format instead of separate file formats avoids duplicated the schema, similar to toPandas above. The process of reading the stream and parallelizing the batches does cause the record batch message metadata to be copied, but it's size is insignificant.

@BryanCutler
Copy link
Member Author

BryanCutler commented Jun 12, 2018

This is a WIP because I had to hack up some of the message processing code in Arrow. This should be done in Arrow, and then it can be cleaned up here. I will make these changes for version 0.10.0 and complete this once we have upgraded.

I'll add some details of tests I ran to compare before/after also.

private[sql] def getBatchesFromStream(in: SeekableByteChannel): Iterator[Array[Byte]] = {

// TODO: simplify in super class
class RecordBatchMessageReader(inputChannel: SeekableByteChannel) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to modify the existing Arrow code to allow for this, but I will work on getting these changes into Arrow for 0.10.0 and then this class can be simplified a lot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@icexelloss icexelloss Jun 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this does seem pretty complicated. I suppose you didn't use

public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch)

in message serializer to avoid double copy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this works?
In a for-loop:
(1) Read next batch into VectorSchemaRoot (copy into arrow memory)
(2) Use VectorUnloader to unload the VectorSchemaRoot to an ArrowRecordBatch (no copy)
(3) Use MessageSerializer.serialize to write ArrowRecordBatch to a ByteChannel (copy from arrow memory to java memory)

Seems that we cannot directly read from socket into java memory anyway (have to go through Arrow memory allocator)..

Copy link
Contributor

@icexelloss icexelloss Jun 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BryanCutler Now I looked at this more I think I understand what are you trying to do: The whole class here is trying to read Arrow record batches from an stream into Java on-heap memory without going through Arrow off-heap memory, is that correct?

Also, this function is only used for pandas DataFrame -> Spark DataFrame?

Copy link
Contributor

@icexelloss icexelloss Jun 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. The main reason that I am not sure about this code is that the code here breaks encapsulation.

If I understand correctly, Arrow reader only supports reading record batch from Channel to Arrow memory, in order to read record batch from Channel to on-heap memory directly, we need to subclass MessageChannelReader, overwrite readNextMessage to load the the metadata and body of record batch.

Now the main point that I feel not comfortable with this approach:
The subclass changes the behavior of readNextMessage to load both metadata and body of a record batch, where in the parent class it only loads meta of a record batch. I think this is also the contract of the interface MessageReader so this feels a bit hacky.

I am not saying I am totally against this for performance reasons, but considering the code path already involves writing data to disk (so avoid one memory copy won't necessary get us much) and is one of the less frequent operations (pandas DataFrame -> spark DataFrame), I am not sure it's worth it, that's why I suggest resolving this separately so not to block this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw I don't mind discuss it in this PR too. Also curious what other people think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok... I don't really need to subclass MessageChannelReader here. What if instead we just make a couple of static functions on the Arrow side to help with the details of processing messages, like:

public class MessageChannelReader {
...

  public static Integer readMessageLength(ReadChannel in) {..}

  public static Message loadMessage(ReadChannel in, int messageLength, ByteBuffer buffer) {..}
}

Is that better?

Copy link
Contributor

@icexelloss icexelloss Jun 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.. one way to do it is to write a new MessageReader interface to read Arrow message from a Channel:

public class OnHeapMessageChannelReader {
  /**
   * Read the next message in the sequence.
   *
   * @return The read message or null if reached the end of the message sequence
   * @throws IOException
   */
  Message readNextMessage() throws IOException;

  /**
   * When a message is followed by a body of data, read that data into an on heap ByteBuffer. This should
   * only be called when a Message has a body length > 0.
   *
   * @param message Read message that is followed by a body of data
   * @return An ByteBuffer containing the body of the message that was read
   * @throws IOException
   */
   ByteBuffer readMessageBody(Message message) throws IOException;

   ...
}

We might need to duplicate some logic in https://github.com/apache/arrow/blob/master/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java#L33

For record batches, it's not too bad because the logic is pretty simple, but the down side is we will be using low level APIs of Arrow, which might not be guaranteed to be stable. What do you think?

@BryanCutler what kind of static function do you think we need to add on the Arrow side?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just referring to the two static functions in the previous post. These will contain most of the low level operations inside for stability. I'm not sure we need a new interface to handle this case, it's probably not a common use case. I'll just implement what I thought and maybe it will be more clear.

assert(arrowPayloads.length == indexData.rdd.getNumPartitions)
val arrowBatches = indexData.getArrowBatchRdd.collect()
assert(arrowBatches.nonEmpty)
assert(arrowBatches.length == indexData.rdd.getNumPartitions)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these changes are just renames to be consistent

@SparkQA
Copy link

SparkQA commented Jun 13, 2018

Test build #91733 has finished for PR 21546 at commit a5a1fbe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

arrowBatchRdd,
(ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray,
0 until numPartitions,
handlePartitionBatches)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of collecting partitions back at once and holding out of order partitions in driver waiting for partitions in order, is it better to incrementally run job on partitions in order and send streams to python side? So we don't need to hold out of order partitions in driver.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 chunking if we could. I recall Bryan said for grouped UDF we need the entire set.

Also not sure if python side we have any assumption on how much of the partition is in each chunk (there shouldn't be?)

Copy link
Member Author

@BryanCutler BryanCutler Jun 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to incrementally run job on partitions in order

I believe this is how toLocalIterator works right? I tried using that because it does only keep 1 partition in memory at a time, but the performance took quite a hit from the multiple jobs. I think we should still prioritize performance over memory for toPandas() since it's assumed the data to be collect should be relatively small.

I did have another idea though, we could stream all partitions to Python out of order, then follow with another small batch of data that contains maps of partitionIndex to orderReceived. Then the partitions could be put into order on the Python side before making the Pandas DataFrame.

Copy link
Member Author

@BryanCutler BryanCutler Jun 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 chunking if we could. I recall Bryan said for grouped UDF we need the entire set.

This still keeps Arrow record batches chunked within each partition, which can help the executor memory, but doesn't do anything for the driver side because we still need to collect the entire partition in the driver JVM.

Also not sure if python side we have any assumption on how much of the partition is in each chunk (there shouldn't be?)

No, Python doesn't care how many chunks the data is in, it's handled by pyarrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did have another idea though, we could stream all partitions to Python out of order, then follow with another small batch of data that contains maps of partitionIndex to orderReceived. Then the partitions could be put into order on the Python side before making the Pandas DataFrame.

This sounds good!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in worst case scenario, the driver still needs to hold all batches in memory. For example, all the batches arrive at the same time.

I wonder if there is a way to:
(1) Compute all tasks in parallell, once tasks are done, store the result in Block manager on executors.
(2) Return all block id to the driver
(3) Driver fetches each block and stream individually.

This way at least the computation is done in parallel, fetching the result sequentially is a trade off of speed vs memory, something we or the user can choose, but I imagine fetching some 10G - 20G data from executors sequentially shouldn't be too bad.

sqlContext: SQLContext,
filename: String,
schemaString: String): DataFrame = {
JavaSparkContext.fromSparkContext(sqlContext.sparkContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this line for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, nothing! I must have forgot to delete, thanks!

* @param sqlContext The active [[SQLContext]].
* @return The converted [[DataFrame]].
*/
def arrowPayloadToDataFrame(
payloadRDD: JavaRDD[Array[Byte]],
def arrowStreamToDataFrame(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems being a private method now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's public so it can be called in Python with Py4j

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right, this is only called by the function below so I suppose we don't even need it..

* Read a file as an Arrow stream and return an RDD of serialized ArrowRecordBatches.
*/
private[sql] def readArrowStreamFromFile(sqlContext: SQLContext, filename: String):
JavaRDD[Array[Byte]] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, schema, 5, null, ctx)

// Write batches to Arrow stream format as a byte array
val out = new ByteArrayOutputStream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Utils.tryWithResource { ... }?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually need to be closed, but I should be closing the DataOutputStream, so I'll put that in tryWithResource

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -184,24 +184,28 @@ def loads(self, obj):
raise NotImplementedError


class ArrowSerializer(FramedSerializer):
class ArrowSerializer(Serializer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe ArrowStreamSerializer?

@gatorsmile
Copy link
Member

cc @ueshin

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95340 has finished for PR 21546 at commit 2fe46f8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95363 has finished for PR 21546 at commit 2fe46f8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this great work!

A general question. If the Arrow stream format has a bug, can we still use the original Arrow file format? My major concern is the regression. Can we keep both in the code?

@HyukjinKwon
Copy link
Member

It's not a bug. Here's what's going on now:

He proposed a way to write out regardless of partition order, I suggested to revert that part back (see #21546 (comment)) since the code path looks being complicated and looks that can orthogonally proceed, which can be also applied to other PySpark code path.

This one still has some improvements (see #21546 (comment)).

@BryanCutler
Copy link
Member Author

@gatorsmile , this is just the format for Arrow IPC between the JVM and Python process and although it used the Arrow File format, there is nothing persisted. There is no real reason to keep both formats, the stream format is better for our purposes and it's already what is being used for pandas_udfs, so there is unlikely a bug in the Arrow format itself. As with any change, a bug is possible but this has been tested pretty thouroughly and trying to keep the old code would get really messy and complicated.

@gatorsmile
Copy link
Member

@BryanCutler The worst case is to turn off spark.sql.execution.arrow.enabled, if the new code path has a bug, right?

@HyukjinKwon
Copy link
Member

Yup, since spark.sql.execution.arrow.enabled is an experimental feature, we could just turn this off if there are critical bugs found later after the release.

@BryanCutler
Copy link
Member Author

BryanCutler commented Aug 29, 2018 via email

@HyukjinKwon
Copy link
Member

retest this please

@BryanCutler
Copy link
Member Author

@HyukjinKwon I redid the benchmarks for toPandas with the current code and updated the description. It's not a huge speedup now, but definitely does improve some. I'll also followup with another PR with the out-of-order batches to improve this even further. Let me know if this looks ok to you (pending tests). Thanks!

@SparkQA
Copy link

SparkQA commented Aug 29, 2018

Test build #95390 has finished for PR 21546 at commit ffb47cb.

  • This patch fails from timeout after a configured wait of `400m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArrowEvalPython(udfs: Seq[PythonUDF], output: Seq[Attribute], child: LogicalPlan)
  • case class BatchEvalPython(udfs: Seq[PythonUDF], output: Seq[Attribute], child: LogicalPlan)

@SparkQA
Copy link

SparkQA commented Aug 29, 2018

Test build #95397 has finished for PR 21546 at commit ffb47cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArrowEvalPython(udfs: Seq[PythonUDF], output: Seq[Attribute], child: LogicalPlan)
  • case class BatchEvalPython(udfs: Seq[PythonUDF], output: Seq[Attribute], child: LogicalPlan)

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in 82c18c2 Aug 29, 2018
@BryanCutler
Copy link
Member Author

Thanks @HyukjinKwon and others who reviewed!

return RDD(jrdd, self, serializer)

def _serialize_to_jvm(self, data, parallelism, serializer):
def _serialize_to_jvm(self, data, serializer, reader_func):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, sorry for the late review here, and more just a question for myself -- is this aspect tested at all? IIUC, it would be used in spark.createDataFrame, but the tests in session.py don't have arrow enabled, right?

not that I see a bug, mostly just wondering as I was looking at making my own changes here, and it would be nice if I knew there were some tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(if not, I can try to address this in some other work)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @squito , yes that's correct this is in the path that ArrowTests with createDataFrame tests. These tests are skipped if pyarrow is not installed, but for our Jenkins tests it is installed under the Python 3.5 env so it gets tested there.

It's a little subtle to see that they were run since the test output shows only when tests are skipped. You can see that for Python 2.7 ArrowTests show as skipped, but for 3.5 it does not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made https://issues.apache.org/jira/browse/SPARK-25272 which will give a more clear output that the ArrowTests were run.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I worry about the test coverage of PySpark in general. Anybody in PySpark can lead the effort to propose a solution for improving the test coverage?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although most parts in PySpark should be guaranteed by Spark Core and SQL, PySpark starts to have more and more PySpark-only stuffs. I am not very sure how well they are tested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @BryanCutler , sorry I didn't know where to look for those, they look much better than what I would have added!

@HyukjinKwon
Copy link
Member

@gatorsmile, I'm working on test coverage report and already almost finished. Only left job is to setup a Jenkins job as I talked. cc @shaneknapp

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Aug 30, 2018

For now, you can run https://github.com/apache/spark/blob/master/python/run-tests-with-coverage script to check the coverage.

asfgit pushed a commit that referenced this pull request Dec 6, 2018
…ord batches to improve performance

## What changes were proposed in this pull request?

When executing `toPandas` with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in.

This change sends un-ordered partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased.

Followup to #21546

## How was this patch tested?

Added new test with a large number of batches per partition, and test that forces a small delay in the first partition. These test that partitions are collected out-of-order and then are are put in the correct order in Python.

## Performance Tests - toPandas

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each.

Test code
```python
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand())
for i in range(5):
	start = time.time()
	_ = df.toPandas()
	elapsed = time.time() - start
```

Spark config
```
spark.driver.memory 5g
spark.executor.memory 5g
spark.driver.maxResultSize 2g
spark.sql.execution.arrow.enabled true
```

Current Master w/ Arrow stream | This PR
---------------------|------------
5.16207 | 4.342533
5.133671 | 4.399408
5.147513 | 4.468471
5.105243 | 4.36524
5.018685 | 4.373791

Avg Master | Avg This PR
------------------|--------------
5.1134364 | 4.3898886

Speedup of **1.164821449**

Closes #22275 from BryanCutler/arrow-toPandas-oo-batches-SPARK-25274.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ord batches to improve performance

## What changes were proposed in this pull request?

When executing `toPandas` with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in.

This change sends un-ordered partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased.

Followup to apache#21546

## How was this patch tested?

Added new test with a large number of batches per partition, and test that forces a small delay in the first partition. These test that partitions are collected out-of-order and then are are put in the correct order in Python.

## Performance Tests - toPandas

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each.

Test code
```python
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand())
for i in range(5):
	start = time.time()
	_ = df.toPandas()
	elapsed = time.time() - start
```

Spark config
```
spark.driver.memory 5g
spark.executor.memory 5g
spark.driver.maxResultSize 2g
spark.sql.execution.arrow.enabled true
```

Current Master w/ Arrow stream | This PR
---------------------|------------
5.16207 | 4.342533
5.133671 | 4.399408
5.147513 | 4.468471
5.105243 | 4.36524
5.018685 | 4.373791

Avg Master | Avg This PR
------------------|--------------
5.1134364 | 4.3898886

Speedup of **1.164821449**

Closes apache#22275 from BryanCutler/arrow-toPandas-oo-batches-SPARK-25274.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
dongjoon-hyun pushed a commit that referenced this pull request Aug 27, 2019
…nection thread to propagate errors

### What changes were proposed in this pull request?

This PR proposes to backport #24834 with minimised changes, and the tests added at #25594.

#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM.

However, actually this PR fixed another problem accidentally (see  #25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by #21546.

Root cause is that, seems

https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384

`runJob` with `resultHandler` seems able to write partial output.

JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below:

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```
```
Empty DataFrame
Columns: [id]
Index: []
```

With this change, it lets Python process catches exceptions from JVM.

### Why are the changes needed?

It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3.

### Does this PR introduce any user-facing change?

Yes.

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas
    batches = self._collectAsArrow()
  File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow
    jsocket_auth_server.getResult()  # Join serving thread and raise any exceptions
  File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/.../pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    ...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB)
```

now throws an exception as expected.

### How was this patch tested?

Manually as described above. unittest added.

Closes #25593 from HyukjinKwon/SPARK-27992.

Lead-authored-by: HyukjinKwon <gurwls223@apache.org>
Co-authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
rluta pushed a commit to rluta/spark that referenced this pull request Sep 17, 2019
…nection thread to propagate errors

### What changes were proposed in this pull request?

This PR proposes to backport apache#24834 with minimised changes, and the tests added at apache#25594.

apache#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM.

However, actually this PR fixed another problem accidentally (see  apache#25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by apache#21546.

Root cause is that, seems

https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384

`runJob` with `resultHandler` seems able to write partial output.

JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below:

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```
```
Empty DataFrame
Columns: [id]
Index: []
```

With this change, it lets Python process catches exceptions from JVM.

### Why are the changes needed?

It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3.

### Does this PR introduce any user-facing change?

Yes.

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas
    batches = self._collectAsArrow()
  File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow
    jsocket_auth_server.getResult()  # Join serving thread and raise any exceptions
  File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/.../pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    ...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB)
```

now throws an exception as expected.

### How was this patch tested?

Manually as described above. unittest added.

Closes apache#25593 from HyukjinKwon/SPARK-27992.

Lead-authored-by: HyukjinKwon <gurwls223@apache.org>
Co-authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Sep 26, 2019
…nection thread to propagate errors

### What changes were proposed in this pull request?

This PR proposes to backport apache#24834 with minimised changes, and the tests added at apache#25594.

apache#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM.

However, actually this PR fixed another problem accidentally (see  apache#25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by apache#21546.

Root cause is that, seems

https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384

`runJob` with `resultHandler` seems able to write partial output.

JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below:

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```
```
Empty DataFrame
Columns: [id]
Index: []
```

With this change, it lets Python process catches exceptions from JVM.

### Why are the changes needed?

It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3.

### Does this PR introduce any user-facing change?

Yes.

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas
    batches = self._collectAsArrow()
  File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow
    jsocket_auth_server.getResult()  # Join serving thread and raise any exceptions
  File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/.../pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    ...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB)
```

now throws an exception as expected.

### How was this patch tested?

Manually as described above. unittest added.

Closes apache#25593 from HyukjinKwon/SPARK-27992.

Lead-authored-by: HyukjinKwon <gurwls223@apache.org>
Co-authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants