-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23030][SQL][PYTHON] Use Arrow stream format for creating from and collecting Pandas DataFrames #21546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23030][SQL][PYTHON] Use Arrow stream format for creating from and collecting Pandas DataFrames #21546
Conversation
…o stream ordered partitions
…ala tests pass, style pass
…essageChannelReader
…added some comments
This is a WIP because I had to hack up some of the message processing code in Arrow. This should be done in Arrow, and then it can be cleaned up here. I will make these changes for version 0.10.0 and complete this once we have upgraded. I'll add some details of tests I ran to compare before/after also. |
private[sql] def getBatchesFromStream(in: SeekableByteChannel): Iterator[Array[Byte]] = { | ||
|
||
// TODO: simplify in super class | ||
class RecordBatchMessageReader(inputChannel: SeekableByteChannel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to modify the existing Arrow code to allow for this, but I will work on getting these changes into Arrow for 0.10.0 and then this class can be simplified a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made https://issues.apache.org/jira/browse/ARROW-2704 to track
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this does seem pretty complicated. I suppose you didn't use
public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch)
in message serializer to avoid double copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this works?
In a for-loop:
(1) Read next batch into VectorSchemaRoot (copy into arrow memory)
(2) Use VectorUnloader to unload the VectorSchemaRoot to an ArrowRecordBatch (no copy)
(3) Use MessageSerializer.serialize to write ArrowRecordBatch to a ByteChannel (copy from arrow memory to java memory)
Seems that we cannot directly read from socket into java memory anyway (have to go through Arrow memory allocator)..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler Now I looked at this more I think I understand what are you trying to do: The whole class here is trying to read Arrow record batches from an stream into Java on-heap memory without going through Arrow off-heap memory, is that correct?
Also, this function is only used for pandas DataFrame -> Spark DataFrame?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. The main reason that I am not sure about this code is that the code here breaks encapsulation.
If I understand correctly, Arrow reader only supports reading record batch from Channel to Arrow memory, in order to read record batch from Channel to on-heap memory directly, we need to subclass MessageChannelReader
, overwrite readNextMessage
to load the the metadata and body of record batch.
Now the main point that I feel not comfortable with this approach:
The subclass changes the behavior of readNextMessage
to load both metadata and body of a record batch, where in the parent class it only loads meta of a record batch. I think this is also the contract of the interface MessageReader
so this feels a bit hacky.
I am not saying I am totally against this for performance reasons, but considering the code path already involves writing data to disk (so avoid one memory copy won't necessary get us much) and is one of the less frequent operations (pandas DataFrame -> spark DataFrame), I am not sure it's worth it, that's why I suggest resolving this separately so not to block this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw I don't mind discuss it in this PR too. Also curious what other people think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok... I don't really need to subclass MessageChannelReader
here. What if instead we just make a couple of static functions on the Arrow side to help with the details of processing messages, like:
public class MessageChannelReader {
...
public static Integer readMessageLength(ReadChannel in) {..}
public static Message loadMessage(ReadChannel in, int messageLength, ByteBuffer buffer) {..}
}
Is that better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.. one way to do it is to write a new MessageReader interface to read Arrow message from a Channel:
public class OnHeapMessageChannelReader {
/**
* Read the next message in the sequence.
*
* @return The read message or null if reached the end of the message sequence
* @throws IOException
*/
Message readNextMessage() throws IOException;
/**
* When a message is followed by a body of data, read that data into an on heap ByteBuffer. This should
* only be called when a Message has a body length > 0.
*
* @param message Read message that is followed by a body of data
* @return An ByteBuffer containing the body of the message that was read
* @throws IOException
*/
ByteBuffer readMessageBody(Message message) throws IOException;
...
}
We might need to duplicate some logic in https://github.com/apache/arrow/blob/master/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java#L33
For record batches, it's not too bad because the logic is pretty simple, but the down side is we will be using low level APIs of Arrow, which might not be guaranteed to be stable. What do you think?
@BryanCutler what kind of static function do you think we need to add on the Arrow side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just referring to the two static functions in the previous post. These will contain most of the low level operations inside for stability. I'm not sure we need a new interface to handle this case, it's probably not a common use case. I'll just implement what I thought and maybe it will be more clear.
assert(arrowPayloads.length == indexData.rdd.getNumPartitions) | ||
val arrowBatches = indexData.getArrowBatchRdd.collect() | ||
assert(arrowBatches.nonEmpty) | ||
assert(arrowBatches.length == indexData.rdd.getNumPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of these changes are just renames to be consistent
Test build #91733 has finished for PR 21546 at commit
|
arrowBatchRdd, | ||
(ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray, | ||
0 until numPartitions, | ||
handlePartitionBatches) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of collecting partitions back at once and holding out of order partitions in driver waiting for partitions in order, is it better to incrementally run job on partitions in order and send streams to python side? So we don't need to hold out of order partitions in driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 chunking if we could. I recall Bryan said for grouped UDF we need the entire set.
Also not sure if python side we have any assumption on how much of the partition is in each chunk (there shouldn't be?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to incrementally run job on partitions in order
I believe this is how toLocalIterator
works right? I tried using that because it does only keep 1 partition in memory at a time, but the performance took quite a hit from the multiple jobs. I think we should still prioritize performance over memory for toPandas()
since it's assumed the data to be collect should be relatively small.
I did have another idea though, we could stream all partitions to Python out of order, then follow with another small batch of data that contains maps of partitionIndex to orderReceived. Then the partitions could be put into order on the Python side before making the Pandas DataFrame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 chunking if we could. I recall Bryan said for grouped UDF we need the entire set.
This still keeps Arrow record batches chunked within each partition, which can help the executor memory, but doesn't do anything for the driver side because we still need to collect the entire partition in the driver JVM.
Also not sure if python side we have any assumption on how much of the partition is in each chunk (there shouldn't be?)
No, Python doesn't care how many chunks the data is in, it's handled by pyarrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did have another idea though, we could stream all partitions to Python out of order, then follow with another small batch of data that contains maps of partitionIndex to orderReceived. Then the partitions could be put into order on the Python side before making the Pandas DataFrame.
This sounds good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess in worst case scenario, the driver still needs to hold all batches in memory. For example, all the batches arrive at the same time.
I wonder if there is a way to:
(1) Compute all tasks in parallell, once tasks are done, store the result in Block manager on executors.
(2) Return all block id to the driver
(3) Driver fetches each block and stream individually.
This way at least the computation is done in parallel, fetching the result sequentially is a trade off of speed vs memory, something we or the user can choose, but I imagine fetching some 10G - 20G data from executors sequentially shouldn't be too bad.
sqlContext: SQLContext, | ||
filename: String, | ||
schemaString: String): DataFrame = { | ||
JavaSparkContext.fromSparkContext(sqlContext.sparkContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this line for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, nothing! I must have forgot to delete, thanks!
* @param sqlContext The active [[SQLContext]]. | ||
* @return The converted [[DataFrame]]. | ||
*/ | ||
def arrowPayloadToDataFrame( | ||
payloadRDD: JavaRDD[Array[Byte]], | ||
def arrowStreamToDataFrame( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems being a private method now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's public so it can be called in Python with Py4j
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh right, this is only called by the function below so I suppose we don't even need it..
* Read a file as an Arrow stream and return an RDD of serialized ArrowRecordBatches. | ||
*/ | ||
private[sql] def readArrowStreamFromFile(sqlContext: SQLContext, filename: String): | ||
JavaRDD[Array[Byte]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation
val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, schema, 5, null, ctx) | ||
|
||
// Write batches to Arrow stream format as a byte array | ||
val out = new ByteArrayOutputStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Utils.tryWithResource { ... }
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't actually need to be closed, but I should be closing the DataOutputStream, so I'll put that in tryWithResource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/pyspark/serializers.py
Outdated
@@ -184,24 +184,28 @@ def loads(self, obj): | |||
raise NotImplementedError | |||
|
|||
|
|||
class ArrowSerializer(FramedSerializer): | |||
class ArrowSerializer(Serializer): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe ArrowStreamSerializer
?
cc @ueshin |
retest this please |
Test build #95340 has finished for PR 21546 at commit
|
retest this please |
Test build #95363 has finished for PR 21546 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this great work!
A general question. If the Arrow stream format has a bug, can we still use the original Arrow file format? My major concern is the regression. Can we keep both in the code?
It's not a bug. Here's what's going on now: He proposed a way to write out regardless of partition order, I suggested to revert that part back (see #21546 (comment)) since the code path looks being complicated and looks that can orthogonally proceed, which can be also applied to other PySpark code path. This one still has some improvements (see #21546 (comment)). |
@gatorsmile , this is just the format for Arrow IPC between the JVM and Python process and although it used the Arrow File format, there is nothing persisted. There is no real reason to keep both formats, the stream format is better for our purposes and it's already what is being used for |
@BryanCutler The worst case is to turn off |
Yup, since |
Yes, that is the worst case. If there is some bug with types/schema then
there is an automatic fallback to the non-arrow code path too
…On Tue, Aug 28, 2018, 7:16 PM Xiao Li ***@***.***> wrote:
@BryanCutler <https://github.com/BryanCutler> The worst case is to turn
off spark.sql.execution.arrow.enabled, if the new code path has a bug,
right?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#21546 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AEUwdeFLnFzURZmVk4wwwcJJhBumZJeqks5uVfligaJpZM4UlMKq>
.
|
retest this please |
@HyukjinKwon I redid the benchmarks for |
Test build #95390 has finished for PR 21546 at commit
|
Test build #95397 has finished for PR 21546 at commit
|
Merged to master. |
Thanks @HyukjinKwon and others who reviewed! |
return RDD(jrdd, self, serializer) | ||
|
||
def _serialize_to_jvm(self, data, parallelism, serializer): | ||
def _serialize_to_jvm(self, data, serializer, reader_func): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi, sorry for the late review here, and more just a question for myself -- is this aspect tested at all? IIUC, it would be used in spark.createDataFrame
, but the tests in session.py don't have arrow enabled, right?
not that I see a bug, mostly just wondering as I was looking at making my own changes here, and it would be nice if I knew there were some tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(if not, I can try to address this in some other work)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @squito , yes that's correct this is in the path that ArrowTests
with createDataFrame
tests. These tests are skipped if pyarrow is not installed, but for our Jenkins tests it is installed under the Python 3.5 env so it gets tested there.
It's a little subtle to see that they were run since the test output shows only when tests are skipped. You can see that for Python 2.7 ArrowTests
show as skipped, but for 3.5 it does not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made https://issues.apache.org/jira/browse/SPARK-25272 which will give a more clear output that the ArrowTests were run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I worry about the test coverage of PySpark in general. Anybody in PySpark can lead the effort to propose a solution for improving the test coverage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although most parts in PySpark should be guaranteed by Spark Core and SQL, PySpark starts to have more and more PySpark-only stuffs. I am not very sure how well they are tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @BryanCutler , sorry I didn't know where to look for those, they look much better than what I would have added!
@gatorsmile, I'm working on test coverage report and already almost finished. Only left job is to setup a Jenkins job as I talked. cc @shaneknapp |
For now, you can run https://github.com/apache/spark/blob/master/python/run-tests-with-coverage script to check the coverage. |
…ord batches to improve performance ## What changes were proposed in this pull request? When executing `toPandas` with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in. This change sends un-ordered partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased. Followup to #21546 ## How was this patch tested? Added new test with a large number of batches per partition, and test that forces a small delay in the first partition. These test that partitions are collected out-of-order and then are are put in the correct order in Python. ## Performance Tests - toPandas Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8 measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each. Test code ```python df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand()) for i in range(5): start = time.time() _ = df.toPandas() elapsed = time.time() - start ``` Spark config ``` spark.driver.memory 5g spark.executor.memory 5g spark.driver.maxResultSize 2g spark.sql.execution.arrow.enabled true ``` Current Master w/ Arrow stream | This PR ---------------------|------------ 5.16207 | 4.342533 5.133671 | 4.399408 5.147513 | 4.468471 5.105243 | 4.36524 5.018685 | 4.373791 Avg Master | Avg This PR ------------------|-------------- 5.1134364 | 4.3898886 Speedup of **1.164821449** Closes #22275 from BryanCutler/arrow-toPandas-oo-batches-SPARK-25274. Authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
…ord batches to improve performance ## What changes were proposed in this pull request? When executing `toPandas` with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in. This change sends un-ordered partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased. Followup to apache#21546 ## How was this patch tested? Added new test with a large number of batches per partition, and test that forces a small delay in the first partition. These test that partitions are collected out-of-order and then are are put in the correct order in Python. ## Performance Tests - toPandas Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8 measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each. Test code ```python df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand()) for i in range(5): start = time.time() _ = df.toPandas() elapsed = time.time() - start ``` Spark config ``` spark.driver.memory 5g spark.executor.memory 5g spark.driver.maxResultSize 2g spark.sql.execution.arrow.enabled true ``` Current Master w/ Arrow stream | This PR ---------------------|------------ 5.16207 | 4.342533 5.133671 | 4.399408 5.147513 | 4.468471 5.105243 | 4.36524 5.018685 | 4.373791 Avg Master | Avg This PR ------------------|-------------- 5.1134364 | 4.3898886 Speedup of **1.164821449** Closes apache#22275 from BryanCutler/arrow-toPandas-oo-batches-SPARK-25274. Authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
…nection thread to propagate errors ### What changes were proposed in this pull request? This PR proposes to backport #24834 with minimised changes, and the tests added at #25594. #24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM. However, actually this PR fixed another problem accidentally (see #25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by #21546. Root cause is that, seems https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384 `runJob` with `resultHandler` seems able to write partial output. JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below: ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Empty DataFrame Columns: [id] Index: [] ``` With this change, it lets Python process catches exceptions from JVM. ### Why are the changes needed? It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3. ### Does this PR introduce any user-facing change? Yes. ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas batches = self._collectAsArrow() File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow jsocket_auth_server.getResult() # Join serving thread and raise any exceptions File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/.../pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult. : org.apache.spark.SparkException: Exception thrown in awaitResult: ... Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB) ``` now throws an exception as expected. ### How was this patch tested? Manually as described above. unittest added. Closes #25593 from HyukjinKwon/SPARK-27992. Lead-authored-by: HyukjinKwon <gurwls223@apache.org> Co-authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…nection thread to propagate errors ### What changes were proposed in this pull request? This PR proposes to backport apache#24834 with minimised changes, and the tests added at apache#25594. apache#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM. However, actually this PR fixed another problem accidentally (see apache#25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by apache#21546. Root cause is that, seems https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384 `runJob` with `resultHandler` seems able to write partial output. JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below: ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Empty DataFrame Columns: [id] Index: [] ``` With this change, it lets Python process catches exceptions from JVM. ### Why are the changes needed? It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3. ### Does this PR introduce any user-facing change? Yes. ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas batches = self._collectAsArrow() File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow jsocket_auth_server.getResult() # Join serving thread and raise any exceptions File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/.../pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult. : org.apache.spark.SparkException: Exception thrown in awaitResult: ... Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB) ``` now throws an exception as expected. ### How was this patch tested? Manually as described above. unittest added. Closes apache#25593 from HyukjinKwon/SPARK-27992. Lead-authored-by: HyukjinKwon <gurwls223@apache.org> Co-authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…nection thread to propagate errors ### What changes were proposed in this pull request? This PR proposes to backport apache#24834 with minimised changes, and the tests added at apache#25594. apache#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM. However, actually this PR fixed another problem accidentally (see apache#25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by apache#21546. Root cause is that, seems https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384 `runJob` with `resultHandler` seems able to write partial output. JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below: ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Empty DataFrame Columns: [id] Index: [] ``` With this change, it lets Python process catches exceptions from JVM. ### Why are the changes needed? It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3. ### Does this PR introduce any user-facing change? Yes. ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas batches = self._collectAsArrow() File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow jsocket_auth_server.getResult() # Join serving thread and raise any exceptions File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/.../pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult. : org.apache.spark.SparkException: Exception thrown in awaitResult: ... Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB) ``` now throws an exception as expected. ### How was this patch tested? Manually as described above. unittest added. Closes apache#25593 from HyukjinKwon/SPARK-27992. Lead-authored-by: HyukjinKwon <gurwls223@apache.org> Co-authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This changes the calls of
toPandas()
andcreateDataFrame()
to use the Arrow stream format, when Arrow is enabled. Previously, Arrow data was written to byte arrays where each chunk is an output of the Arrow file format. This was mainly due to constraints at the time, and caused some overhead by writing the schema/footer on each chunk of data and then having to read multiple Arrow file inputs and concat them together.Using the Arrow stream format has improved these by increasing performance, lower memory overhead for the average case, and simplified the code. Here are the details of this change:
toPandas()
Before:
Spark internal rows are converted to Arrow file format, each group of records is a complete Arrow file which contains the schema and other metadata. Next a collect is done and an Array of Arrow files is the result. After that each Arrow file is sent to Python driver which then loads each file and concats them to a single Arrow DataFrame.
After:
Spark internal rows are converted to ArrowRecordBatches directly, which is the simplest Arrow component for IPC data transfers. The driver JVM then immediately starts serving data to Python as an Arrow stream, sending the schema first. It then starts a Spark job with a custom handler that sends Arrow RecordBatches to Python. Partitions arriving in order are sent immediately, and out-of-order partitions are buffered until the ones that precede it come in. This improves performance, simplifies memory usage on executors, and improves the average memory usage on the JVM driver. Since the order of partitions must be preserved, the worst case is that the first partition will be the last to arrive all data must be buffered in memory until then. This case is no worse that before when doing a full collect.
createDataFrame()
Before:
A Pandas DataFrame is split into parts and each part is made into an Arrow file. Then each file is prefixed by the buffer size and written to a temp file. The temp file is read and each Arrow file is parallelized as a byte array.
After:
A Pandas DataFrame is split into parts, then an Arrow stream is written to a temp file where each part is an ArrowRecordBatch. The temp file is read as a stream and the Arrow messages are examined. If the message is an ArrowRecordBatch, the data is saved as a byte array. After reading the file, each ArrowRecordBatch is parallelized as a byte array. This has slightly more processing than before because we must look each Arrow message to extract the record batches, but performance ends up a litle better. It is cleaner in the sense that IPC from Python to JVM is done over a single Arrow stream.
How was this patch tested?
Added new unit tests for the additions to ArrowConverters in Scala, existing tests for Python.
Performance Tests - toPandas
Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute
toPandas()
and took the average best time of 5 runs/5 loops each.Test code
Speedup of 1.08060595
Performance Tests - createDataFrame
Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute
createDataFrame()
and get the first record. Took the average best time of 5 runs/5 loops each.Test code
Speedup of 1.178064192
Memory Improvements
toPandas()
The most significant improvement is reduction of the upper bound space complexity in the JVM driver. Before, the entire dataset was collected in the JVM first before sending it to Python. With this change, as soon as a partition is collected, the result handler immediately sends it to Python, so the upper bound is the size of the largest partition. Also, using the Arrow stream format is more efficient because the schema is written once per stream, followed by record batches. The schema is now only send from driver JVM to Python. Before, multiple Arrow file formats were used that each contained the schema. This duplicated schema was created in the executors, sent to the driver JVM, and then Python where all but the first one received are discarded.
I verified the upper bound limit by running a test that would collect data that would exceed the amount of driver JVM memory available. Using these settings on a standalone cluster:
Test code:
This makes total data size of 33554432×8×4 = 1073741824
With the current master, it fails with OOM but passes using this PR.
createDataFrame()
No significant change in memory except that using the stream format instead of separate file formats avoids duplicated the schema, similar to toPandas above. The process of reading the stream and parallelizing the batches does cause the record batch message metadata to be copied, but it's size is insignificant.