Skip to content

Commit 8ed89a6

Browse files
Chain generators to prevent potential deadlock
1 parent 4153b02 commit 8ed89a6

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

python/pyspark/rdd.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -704,12 +704,15 @@ def pipe_objs(out):
704704
out.write(s.encode('utf-8'))
705705
out.close()
706706
Thread(target=pipe_objs, args=[pipe.stdin]).start()
707-
result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))
708-
pipe.wait()
709-
if pipe.returncode:
710-
raise Exception("Pipe function `%s' exited "
711-
"with error code %d" % (command, pipe.returncode))
712-
return result
707+
def check_return_code():
708+
pipe.wait()
709+
if pipe.returncode:
710+
raise Exception("Pipe function `%s' exited "
711+
"with error code %d" % (command, pipe.returncode))
712+
else:
713+
return None
714+
return (x.rstrip(b'\n').decode('utf-8') for x in
715+
chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None)))
713716
return self.mapPartitions(func)
714717

715718
def foreach(self, f):

0 commit comments

Comments
 (0)