Skip to content

Commit 27bb40b

Browse files
lrzHyukjinKwon
andcommitted
[SPARK-33339][PYTHON] Pyspark application will hang due to non Exception error
### What changes were proposed in this pull request? When a system.exit exception occurs during the process, the python worker exits abnormally, and then the executor task is still waiting for the worker for reading from socket, causing it to hang. The system.exit exception may be caused by the user's error code, but spark should at least throw an error to remind the user, not get stuck we can run a simple test to reproduce this case: ``` from pyspark.sql import SparkSession def err(line): raise SystemExit spark = SparkSession.builder.appName("test").getOrCreate() spark.sparkContext.parallelize(range(1,2), 2).map(err).collect() spark.stop() ``` ### Why are the changes needed? to make sure pyspark application won't hang if there's non-Exception error in python worker ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added a new test and also manually tested the case above Closes #30248 from li36909/pyspark. Lead-authored-by: lrz <lrz@lrzdeMacBook-Pro.local> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
1 parent e3a768d commit 27bb40b

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

python/pyspark/tests/test_worker.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,15 @@ def raise_exception(_):
9595
self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
9696
self.assertEqual(100, rdd.map(str).count())
9797

98+
def test_after_non_exception_error(self):
99+
# SPARK-33339: Pyspark application will hang due to non Exception
100+
def raise_system_exit(_):
101+
raise SystemExit()
102+
rdd = self.sc.parallelize(range(100), 1)
103+
with QuietTest(self.sc):
104+
self.assertRaises(Exception, lambda: rdd.foreach(raise_system_exit))
105+
self.assertEqual(100, rdd.map(str).count())
106+
98107
def test_after_jvm_exception(self):
99108
tempFile = tempfile.NamedTemporaryFile(delete=False)
100109
tempFile.write(b"Hello World!")

python/pyspark/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ def process():
604604
# reuse.
605605
TaskContext._setTaskContext(None)
606606
BarrierTaskContext._setTaskContext(None)
607-
except Exception:
607+
except BaseException:
608608
try:
609609
exc_info = traceback.format_exc()
610610
if isinstance(exc_info, bytes):
@@ -618,7 +618,7 @@ def process():
618618
except IOError:
619619
# JVM close the socket
620620
pass
621-
except Exception:
621+
except BaseException:
622622
# Write the error to stderr if it happened while serializing
623623
print("PySpark worker failed with exception:", file=sys.stderr)
624624
print(traceback.format_exc(), file=sys.stderr)

0 commit comments

Comments
 (0)