Skip to content

Commit fb91792

Browse files
committed
org.apache.spark.broadcast.TorrentBroadcast does use the serializer class specified in the spark option "spark.serializer"
1 parent b7c89a7 commit fb91792

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
package org.apache.spark.broadcast
1919

20-
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
20+
import java.io._
2121

2222
import scala.reflect.ClassTag
2323
import scala.util.Random
2424

2525
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
2626
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
27-
import org.apache.spark.util.Utils
2827

2928
/**
3029
* A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
@@ -228,8 +227,12 @@ private[broadcast] object TorrentBroadcast extends Logging {
228227
initialized = false
229228
}
230229

231-
def blockifyObject[T](obj: T): TorrentInfo = {
232-
val byteArray = Utils.serialize[T](obj)
230+
def blockifyObject[T: ClassTag](obj: T): TorrentInfo = {
231+
val bos = new ByteArrayOutputStream()
232+
val ser = SparkEnv.get.serializer.newInstance()
233+
val serOut = ser.serializeStream(bos)
234+
serOut.writeObject[T](obj).close()
235+
val byteArray = bos.toByteArray
233236
val bais = new ByteArrayInputStream(byteArray)
234237

235238
var blockNum = byteArray.length / BLOCK_SIZE
@@ -255,7 +258,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
255258
info
256259
}
257260

258-
def unBlockifyObject[T](
261+
def unBlockifyObject[T: ClassTag](
259262
arrayOfBlocks: Array[TorrentBlock],
260263
totalBytes: Int,
261264
totalBlocks: Int): T = {
@@ -264,7 +267,12 @@ private[broadcast] object TorrentBroadcast extends Logging {
264267
System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray,
265268
i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length)
266269
}
267-
Utils.deserialize[T](retByteArray, Thread.currentThread.getContextClassLoader)
270+
val in = new ByteArrayInputStream(retByteArray)
271+
val ser = SparkEnv.get.serializer.newInstance()
272+
val serIn = ser.deserializeStream(in)
273+
val obj = serIn.readObject[T]()
274+
serIn.close()
275+
obj
268276
}
269277

270278
/**

0 commit comments

Comments
 (0)