Skip to content

Commit a2a8873

Browse files
10110346srowen
authored andcommitted
[SPARK-25786][CORE] If the ByteBuffer.hasArray is false , it will throw UnsupportedOperationException for Kryo
`deserialize` for kryo, the type of input parameter is ByteBuffer, if it is not backed by an accessible byte array. it will throw `UnsupportedOperationException` Exception Info: ``` java.lang.UnsupportedOperationException was thrown. java.lang.UnsupportedOperationException at java.nio.ByteBuffer.array(ByteBuffer.java:994) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:362) ``` Added a unit test Closes #22779 from 10110346/InputStreamKryo. Authored-by: liuxian <liu.xian3@zte.com.cn> Signed-off-by: Sean Owen <sean.owen@databricks.com> (cherry picked from commit 7f5f7a9) Signed-off-by: Sean Owen <sean.owen@databricks.com>
1 parent 14d501b commit a2a8873

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.internal.Logging
4141
import org.apache.spark.network.util.ByteUnit
4242
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
4343
import org.apache.spark.storage._
44-
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
44+
import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, SerializableConfiguration, SerializableJobConf, Utils}
4545
import org.apache.spark.util.collection.CompactBuffer
4646

4747
/**
@@ -358,7 +358,12 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boole
358358
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
359359
val kryo = borrowKryo()
360360
try {
361-
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
361+
if (bytes.hasArray) {
362+
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
363+
} else {
364+
input.setBuffer(new Array[Byte](4096))
365+
input.setInputStream(new ByteBufferInputStream(bytes))
366+
}
362367
kryo.readClassAndObject(input).asInstanceOf[T]
363368
} finally {
364369
releaseKryo(kryo)
@@ -370,7 +375,12 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boole
370375
val oldClassLoader = kryo.getClassLoader
371376
try {
372377
kryo.setClassLoader(loader)
373-
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
378+
if (bytes.hasArray) {
379+
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
380+
} else {
381+
input.setBuffer(new Array[Byte](4096))
382+
input.setInputStream(new ByteBufferInputStream(bytes))
383+
}
374384
kryo.readClassAndObject(input).asInstanceOf[T]
375385
} finally {
376386
kryo.setClassLoader(oldClassLoader)

core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.serializer
1919

2020
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileInputStream, FileOutputStream}
21+
import java.nio.ByteBuffer
2122

2223
import scala.collection.JavaConverters._
2324
import scala.collection.mutable
@@ -497,6 +498,17 @@ class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSpar
497498
deserializationStream.close()
498499
assert(serInstance.deserialize[Any](helloHello) === ((hello, hello)))
499500
}
501+
502+
test("SPARK-25786: ByteBuffer.array -- UnsupportedOperationException") {
503+
val serInstance = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
504+
val obj = "UnsupportedOperationException"
505+
val serObj = serInstance.serialize(obj)
506+
val byteBuffer = ByteBuffer.allocateDirect(serObj.array().length)
507+
byteBuffer.put(serObj.array())
508+
byteBuffer.flip()
509+
assert(serInstance.deserialize[Any](serObj) === (obj))
510+
assert(serInstance.deserialize[Any](byteBuffer) === (obj))
511+
}
500512
}
501513

502514
class ClassLoaderTestingObject

0 commit comments

Comments
 (0)