Skip to content

Commit 712679a

Browse files
Davies Liumengxr
Davies Liu
authored andcommitted
[SPARK-6294] fix hang when call take() in JVM on PythonRDD
The Thread.interrupt() can not terminate the thread in some cases, so we should not wait for the writerThread of PythonRDD. This PR also ignore some exception during clean up. cc JoshRosen mengxr Author: Davies Liu <davies@databricks.com> Closes #4987 from davies/fix_take and squashes the following commits: 4488f1a [Davies Liu] fix hang when call take() in JVM on PythonRDD
1 parent 25b71d8 commit 712679a

File tree

3 files changed

+15
-4
lines changed

3 files changed

+15
-4
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ private[spark] class PythonRDD(
7676

7777
context.addTaskCompletionListener { context =>
7878
writerThread.shutdownOnTaskCompletion()
79-
writerThread.join()
8079
if (!reuse_worker || !released) {
8180
try {
8281
worker.close()
@@ -248,13 +247,17 @@ private[spark] class PythonRDD(
248247
} catch {
249248
case e: Exception if context.isCompleted || context.isInterrupted =>
250249
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
251-
Utils.tryLog(worker.shutdownOutput())
250+
if (!worker.isClosed) {
251+
Utils.tryLog(worker.shutdownOutput())
252+
}
252253

253254
case e: Exception =>
254255
// We must avoid throwing exceptions here, because the thread uncaught exception handler
255256
// will kill the whole executor (see org.apache.spark.executor.Executor).
256257
_exception = e
257-
Utils.tryLog(worker.shutdownOutput())
258+
if (!worker.isClosed) {
259+
Utils.tryLog(worker.shutdownOutput())
260+
}
258261
} finally {
259262
// Release memory used by this thread for shuffles
260263
env.shuffleMemoryManager.releaseMemoryForThisThread()

python/pyspark/daemon.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ def worker(sock):
6161
except SystemExit as exc:
6262
exit_code = compute_real_exit_code(exc.code)
6363
finally:
64-
outfile.flush()
64+
try:
65+
outfile.flush()
66+
except Exception:
67+
pass
6568
return exit_code
6669

6770

python/pyspark/tests.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,11 @@ def test_narrow_dependency_in_join(self):
782782
jobId = tracker.getJobIdsForGroup("test4")[0]
783783
self.assertEqual(3, len(tracker.getJobInfo(jobId).stageIds))
784784

785+
# Regression test for SPARK-6294
786+
def test_take_on_jrdd(self):
787+
rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
788+
rdd._jrdd.first()
789+
785790

786791
class ProfilerTests(PySparkTestCase):
787792

0 commit comments

Comments
 (0)