Skip to content

Commit 23cdc5b

Browse files
committed
review commit
1 parent ada4fba commit 23cdc5b

File tree

2 files changed

+15
-17
lines changed

2 files changed

+15
-17
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
package org.apache.spark.broadcast
1919

20-
import java.io._
20+
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
21+
ObjectInputStream, ObjectOutputStream, OutputStream}
2122

2223
import scala.reflect.ClassTag
2324
import scala.util.Random
2425

25-
import org.apache.spark.io.CompressionCodec
2626
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
27+
import org.apache.spark.io.CompressionCodec
2728
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
2829

2930
/**
@@ -233,14 +234,8 @@ private[broadcast] object TorrentBroadcast extends Logging {
233234
}
234235

235236
def blockifyObject[T: ClassTag](obj: T): TorrentInfo = {
236-
val bos =new ByteArrayOutputStream()
237-
val out: OutputStream = {
238-
if (compress) {
239-
compressionCodec.compressedOutputStream(bos)
240-
} else {
241-
bos
242-
}
243-
}
237+
val bos = new ByteArrayOutputStream()
238+
val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
244239
val ser = SparkEnv.get.serializer.newInstance()
245240
val serOut = ser.serializeStream(out)
246241
serOut.writeObject[T](obj).close()
@@ -281,11 +276,8 @@ private[broadcast] object TorrentBroadcast extends Logging {
281276
}
282277

283278
val in: InputStream = {
284-
if (compress) {
285-
compressionCodec.compressedInputStream(new ByteArrayInputStream(retByteArray))
286-
} else {
287-
new ByteArrayInputStream(retByteArray)
288-
}
279+
val arrIn = new ByteArrayInputStream(retByteArray)
280+
if (compress) compressionCodec.compressedInputStream(arrIn) else arrIn
289281
}
290282
val ser = SparkEnv.get.serializer.newInstance()
291283
val serIn = ser.deserializeStream(in)

core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
4444

4545
test("Accessing HttpBroadcast variables in a local cluster") {
4646
val numSlaves = 4
47-
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", httpConf)
47+
val conf = httpConf.clone
48+
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
49+
conf.set("spark.broadcast.compress", "true")
50+
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
4851
val list = List[Int](1, 2, 3, 4)
4952
val broadcast = sc.broadcast(list)
5053
val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
@@ -69,7 +72,10 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
6972

7073
test("Accessing TorrentBroadcast variables in a local cluster") {
7174
val numSlaves = 4
72-
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", torrentConf)
75+
val conf = torrentConf.clone
76+
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
77+
conf.set("spark.broadcast.compress", "true")
78+
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
7379
val list = List[Int](1, 2, 3, 4)
7480
val broadcast = sc.broadcast(list)
7581
val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))

0 commit comments

Comments
 (0)