Skip to content

Commit dd71ba9

Browse files
author
Davies Liu
committed
be safe
1 parent 8dc1adf commit dd71ba9

File tree

3 files changed

+7
-7
lines changed

3 files changed

+7
-7
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ private[spark] case class ChainedPythonFunctions(funcs: Seq[PythonFunction])
8686
private[spark] object PythonRunner {
8787
def apply(func: PythonFunction, bufferSize: Int, reuse_worker: Boolean): PythonRunner = {
8888
new PythonRunner(
89-
Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuse_worker, false, Seq(Seq(0)))
89+
Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuse_worker, false, Array(Array(0)))
9090
}
9191
}
9292

@@ -101,7 +101,7 @@ private[spark] class PythonRunner(
101101
bufferSize: Int,
102102
reuse_worker: Boolean,
103103
isUDF: Boolean,
104-
argOffsets: Seq[Seq[Int]])
104+
argOffsets: Array[Array[Int]])
105105
extends Logging {
106106

107107
require(funcs.length == argOffsets.length, "numArgs should have the same length as funcs")

python/pyspark/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from pyspark.broadcast import Broadcast, _broadcastRegistry
3030
from pyspark.files import SparkFiles
3131
from pyspark.serializers import write_with_length, write_int, read_long, \
32-
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, AutoBatchedSerializer
32+
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, BatchedSerializer
3333
from pyspark import shuffle
3434

3535
pickleSer = PickleSerializer()
@@ -101,7 +101,7 @@ def read_udfs(pickleSer, infile):
101101
mapper = eval(mapper_str, udfs)
102102

103103
func = lambda _, it: map(mapper, it)
104-
ser = AutoBatchedSerializer(PickleSerializer())
104+
ser = BatchedSerializer(PickleSerializer(), 100)
105105
# profiling is not supported for UDF
106106
return func, None, ser, ser
107107

sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,13 @@ case class BatchPythonEvaluation(udfs: Seq[PythonUDF], output: Seq[Attribute], c
8686
dataTypes += e.dataType
8787
allInputs.length - 1
8888
}
89-
}
90-
}
89+
}.toArray
90+
}.toArray
9191
val projection = newMutableProjection(allInputs, child.output)()
9292

9393
// Input iterator to Python: input rows are grouped so we send them in batches to Python.
9494
// For each row, add it to the queue.
95-
val inputIterator = iter.grouped(1024).map { inputRows =>
95+
val inputIterator = iter.grouped(100).map { inputRows =>
9696
val toBePickled = inputRows.map { inputRow =>
9797
queue.add(inputRow)
9898
val row = projection(inputRow)

0 commit comments

Comments
 (0)