-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-27992][SPARK-28881][PYTHON][2.4] Allow Python to join with connection thread to propagate errors #25593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan, @gatorsmile, @BryanCutler, |
I am going to add a test against SPARK-28881 first into the master branch first. And then port the test into here to target branch-2.4. |
Those partial results can actually be reproduced, for instance as below: spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.range(0, 330000, 1, 100).toPandas()
|
So, do we need to merge #25594 first to verify this PR, @HyukjinKwon ? |
Yes, please. |
This comment has been minimized.
This comment has been minimized.
retest this please |
BTW, is this the last one in Python side, @HyukjinKwon ? Do we have more blockers in PySpark? |
From what I know, this is the last one. Actually, this was reported yesterday .. I am a bit rushing the fixes to unblock another RC. |
Thank you for reporting and making PR swiftly. We can cut another RC, but still I didn't receive any feedback from SpakR side. Actually, that's a downside of early |
Between Spark 2.4.3 and RC 2.4.4, there look only one fix: |
Wow. Thank you for confirmation. 😄 |
…row optimization throws an exception per maxResultSize ### What changes were proposed in this pull request? This PR proposes to add a test case for: ```bash ./bin/pyspark --conf spark.driver.maxResultSize=1m spark.conf.set("spark.sql.execution.arrow.enabled",True) ``` ```python spark.range(10000000).toPandas() ``` ``` Empty DataFrame Columns: [id] Index: [] ``` which can result in partial results (see #25593 (comment)). This regression was found between Spark 2.3 and Spark 2.4, and accidentally fixed. ### Why are the changes needed? To prevent the same regression in the future. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Test was added. Closes #25594 from HyukjinKwon/SPARK-28881. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…row optimization throws an exception per maxResultSize This PR proposes to add a test case for: ```bash ./bin/pyspark --conf spark.driver.maxResultSize=1m spark.conf.set("spark.sql.execution.arrow.enabled",True) ``` ```python spark.range(10000000).toPandas() ``` ``` Empty DataFrame Columns: [id] Index: [] ``` which can result in partial results (see apache#25593 (comment)). This regression was found between Spark 2.3 and Spark 2.4, and accidentally fixed. To prevent the same regression in the future. No. Test was added. Closes apache#25594 from HyukjinKwon/SPARK-28881. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
retest this please |
.config("spark.driver.maxResultSize", "10k") \ | ||
.getOrCreate() | ||
|
||
# Explicitly enable Arrow and disable fallback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to double-check, this test fails even if we do not set these 2 configs, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one configuration only spark.sql.execution.arrow.enabled
because the partial results are being produced from Arrow optimized code path.
spark.sql.execution.arrow.fallback.enabled
is just to make sure we only test Arrow optimized code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. In branch-2.4
, spark.sql.execution.arrow.enabled
is false
by default.
I verified this manually that this test doesn't fail if we remove this configurations. This is required in branch-2.4
.
@classmethod | ||
def setUpClass(cls): | ||
cls.spark = SparkSession(SparkContext( | ||
'local[4]', cls.__name__, conf=SparkConf().set("spark.driver.maxResultSize", "10k"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, the last test failure looks weird and flaky (#25593 (comment)). This test itself passed but seems like previously set spark.driver.maxResultSize=10k
affects the other tests even though I stop the session and context explicitly.
This is fine for now in the master branch because this test is in a separate file and launched in a separate process; however, this is potentially an issue.
Since the issue only happens when spark.builder
is used, I am working around, in branch-2.4 specifically, by using SparkSession(SparkContext(...))
for now as it's an orthogonal issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we going to do the same change for master branch later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can although it's fine in the master for now as I described. Let me make a followup later to match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Test build #109814 has finished for PR 25593 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, the changes look good to me
Test build #109817 has finished for PR 25593 at commit
|
…nection thread to propagate errors ### What changes were proposed in this pull request? This PR proposes to backport #24834 with minimised changes, and the tests added at #25594. #24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM. However, actually this PR fixed another problem accidentally (see #25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by #21546. Root cause is that, seems https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384 `runJob` with `resultHandler` seems able to write partial output. JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below: ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Empty DataFrame Columns: [id] Index: [] ``` With this change, it lets Python process catches exceptions from JVM. ### Why are the changes needed? It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3. ### Does this PR introduce any user-facing change? Yes. ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas batches = self._collectAsArrow() File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow jsocket_auth_server.getResult() # Join serving thread and raise any exceptions File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/.../pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult. : org.apache.spark.SparkException: Exception thrown in awaitResult: ... Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB) ``` now throws an exception as expected. ### How was this patch tested? Manually as described above. unittest added. Closes #25593 from HyukjinKwon/SPARK-27992. Lead-authored-by: HyukjinKwon <gurwls223@apache.org> Co-authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Thank you, @HyukjinKwon , @cloud-fan , @BryanCutler . |
…(...)) to prevent for Spark conf to affect other tests ### What changes were proposed in this pull request? This PR proposes to match the test with branch-2.4. See #25593 (comment) Seems using `SparkSession.builder` with Spark conf possibly affects other tests. ### Why are the changes needed? To match with branch-2.4 and to make easier to backport. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Test was fixed. Closes #25603 from HyukjinKwon/SPARK-28881-followup. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
* Create a socket server object and background thread to execute the writeFunc | ||
* with the given OutputStream. | ||
* | ||
* This is the same as serveToStream, only it returns a server object that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: only -> but
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update the comment of serveToStream in the master branch? This might be a common mistake if the contributors are not aware of the trap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment was updated at the PR against master branch -https://github.com/apache/spark/pull/24834/files#diff-1f54938136d72cd234ae55003c91d565R111-R122
@@ -957,3 +980,17 @@ private[spark] class PythonParallelizeServer(sc: SparkContext, parallelism: Int) | |||
} | |||
} | |||
|
|||
/** | |||
* Create a socket server class and run user function on the socket in a background thread. | |||
* This is the same as calling SocketAuthServer.setupOneConnectionServer except it creates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SocketAuthServer.setupOneConnectionServer sets the timeout 15 seconds. This one does not set it. What is the reason we set it in the past?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, there are multiple pleases for such hardcoded timeout - e.g.
serverSocket.setSoTimeout(10000) |
I suspect it won't be a major issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI. @HyukjinKwon and @cloud-fan also found the timeout value only affects accept()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @gatorsmile . Do you mean it affects the on-going 2.4.4
vote?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I think I'm a bit lost in the discussion -- @gatorsmile do you think something is wrong here or not? seems OK to me, the timeout is the same as before, just in a different spot.
I don't think the timeout is crucial for correctness, its more about getting sane errors if there is some bug and nothing connects back. Rather than having things block forever, you'll get a timeout exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the timeout is for establishing the socket connection. Given we build local socket connection between JVM and Python, 10 seconds is fine.
And agree with @squito it's nothing about correctness, if weird thing happens users will get an error instead of wrong result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, it's fine. I don't think there's any major issue with that.
…nection thread to propagate errors ### What changes were proposed in this pull request? This PR proposes to backport apache#24834 with minimised changes, and the tests added at apache#25594. apache#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM. However, actually this PR fixed another problem accidentally (see apache#25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by apache#21546. Root cause is that, seems https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384 `runJob` with `resultHandler` seems able to write partial output. JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below: ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Empty DataFrame Columns: [id] Index: [] ``` With this change, it lets Python process catches exceptions from JVM. ### Why are the changes needed? It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3. ### Does this PR introduce any user-facing change? Yes. ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas batches = self._collectAsArrow() File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow jsocket_auth_server.getResult() # Join serving thread and raise any exceptions File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/.../pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult. : org.apache.spark.SparkException: Exception thrown in awaitResult: ... Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB) ``` now throws an exception as expected. ### How was this patch tested? Manually as described above. unittest added. Closes apache#25593 from HyukjinKwon/SPARK-27992. Lead-authored-by: HyukjinKwon <gurwls223@apache.org> Co-authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…nection thread to propagate errors ### What changes were proposed in this pull request? This PR proposes to backport apache#24834 with minimised changes, and the tests added at apache#25594. apache#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM. However, actually this PR fixed another problem accidentally (see apache#25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by apache#21546. Root cause is that, seems https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384 `runJob` with `resultHandler` seems able to write partial output. JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below: ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Empty DataFrame Columns: [id] Index: [] ``` With this change, it lets Python process catches exceptions from JVM. ### Why are the changes needed? It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3. ### Does this PR introduce any user-facing change? Yes. ``` ./bin/pyspark --conf spark.driver.maxResultSize=1m ``` ```python spark.conf.set("spark.sql.execution.arrow.enabled",True) spark.range(10000000).toPandas() ``` ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas batches = self._collectAsArrow() File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow jsocket_auth_server.getResult() # Join serving thread and raise any exceptions File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/.../pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult. : org.apache.spark.SparkException: Exception thrown in awaitResult: ... Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB) ``` now throws an exception as expected. ### How was this patch tested? Manually as described above. unittest added. Closes apache#25593 from HyukjinKwon/SPARK-27992. Lead-authored-by: HyukjinKwon <gurwls223@apache.org> Co-authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…row optimization throws an exception per maxResultSize ### What changes were proposed in this pull request? This PR proposes to add a test case for: ```bash ./bin/pyspark --conf spark.driver.maxResultSize=1m spark.conf.set("spark.sql.execution.arrow.enabled",True) ``` ```python spark.range(10000000).toPandas() ``` ``` Empty DataFrame Columns: [id] Index: [] ``` which can result in partial results (see apache#25593 (comment)). This regression was found between Spark 2.3 and Spark 2.4, and accidentally fixed. ### Why are the changes needed? To prevent the same regression in the future. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Test was added. Closes apache#25594 from HyukjinKwon/SPARK-28881. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…(...)) to prevent for Spark conf to affect other tests ### What changes were proposed in this pull request? This PR proposes to match the test with branch-2.4. See apache#25593 (comment) Seems using `SparkSession.builder` with Spark conf possibly affects other tests. ### Why are the changes needed? To match with branch-2.4 and to make easier to backport. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Test was fixed. Closes apache#25603 from HyukjinKwon/SPARK-28881-followup. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…row optimization throws an exception per maxResultSize ### What changes were proposed in this pull request? This PR proposes to add a test case for: ```bash ./bin/pyspark --conf spark.driver.maxResultSize=1m spark.conf.set("spark.sql.execution.arrow.enabled",True) ``` ```python spark.range(10000000).toPandas() ``` ``` Empty DataFrame Columns: [id] Index: [] ``` which can result in partial results (see apache#25593 (comment)). This regression was found between Spark 2.3 and Spark 2.4, and accidentally fixed. ### Why are the changes needed? To prevent the same regression in the future. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Test was added. Closes apache#25594 from HyukjinKwon/SPARK-28881. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…(...)) to prevent for Spark conf to affect other tests ### What changes were proposed in this pull request? This PR proposes to match the test with branch-2.4. See apache#25593 (comment) Seems using `SparkSession.builder` with Spark conf possibly affects other tests. ### Why are the changes needed? To match with branch-2.4 and to make easier to backport. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Test was fixed. Closes apache#25603 from HyukjinKwon/SPARK-28881-followup. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR proposes to backport #24834 with minimised changes, and the tests added at #25594.
#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM.
However, actually this PR fixed another problem accidentally (see #25594 and SPARK-28881). This regression seems introduced by #21546.
Root cause is that, seems
spark/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Lines 3370 to 3384 in 23bed0d
runJob
withresultHandler
seems able to write partial output.JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below:
With this change, it lets Python process catches exceptions from JVM.
Why are the changes needed?
It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3.
Does this PR introduce any user-facing change?
Yes.
now throws an exception as expected.
How was this patch tested?
Manually as described above. unittest added.