Skip to content

Commit e3b2fb6

Browse files
committed
Added acceptance of the empty case
1 parent ff356e2 commit e3b2fb6

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,10 @@ private[spark] object SerDeUtil extends Logging {
199199
* representation is serialized
200200
*/
201201
def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
202-
val (keyFailed, valueFailed) = checkPickle(rdd.first())
202+
val (keyFailed, valueFailed) = rdd.take(1) match {
203+
case Array() => (false, false)
204+
case Array(first) => checkPickle(first)
205+
}
203206

204207
rdd.mapPartitions { iter =>
205208
val cleaned = iter.map { case (k, v) =>
@@ -226,10 +229,12 @@ private[spark] object SerDeUtil extends Logging {
226229
}
227230

228231
val rdd = pythonToJava(pyRDD, batched).rdd
229-
rdd.first match {
230-
case obj if isPair(obj) =>
232+
rdd.take(1) match {
233+
case Array(obj) if isPair(obj) =>
231234
// we only accept (K, V)
232-
case other => throw new SparkException(
235+
case Array() =>
236+
// we also accept empty collections
237+
case Array(other) => throw new SparkException(
233238
s"RDD element of type ${other.getClass.getName} cannot be used")
234239
}
235240
rdd.map { obj =>

0 commit comments

Comments
 (0)