Skip to content

Commit

Permalink
Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"
Browse files Browse the repository at this point in the history
This reverts commits ac6fe67 and c06e42f.
  • Loading branch information
JoshRosen committed Feb 17, 2015
1 parent a65766b commit ee6e3ef
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 18 deletions.
21 changes: 4 additions & 17 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,11 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}

// Check whether the worker is ready to be re-used.
if (reuse_worker) {
// It has a high possibility that the ending mark is already available,
// And current task should not be blocked by checking it

if (stream.available() >= 4) {
val ending = stream.readInt()
if (ending == SpecialLengths.END_OF_STREAM) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
released = true
logInfo(s"Communication with worker ended cleanly, re-use it: $worker")
} else {
logInfo(s"Communication with worker did not end cleanly " +
s"(ending with $ending), close it: $worker")
}
} else {
logInfo(s"The ending mark from worker is not available, close it: $worker")
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
if (reuse_worker) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
released = true
}
}
null
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def process():
write_int(len(_accumulatorRegistry), outfile)
for (aid, accum) in _accumulatorRegistry.items():
pickleSer._write_with_length((aid, accum._value), outfile)
outfile.flush()

# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
Expand Down

0 comments on commit ee6e3ef

Please sign in to comment.