@@ -29,26 +29,28 @@ private[graphx]
2929class RoutingTableMessageSerializer extends Serializer with Serializable {
3030 override def newInstance (): SerializerInstance = new ShuffleSerializerInstance {
3131
32- override def serializeStream (s : OutputStream ) = new ShuffleSerializationStream (s) {
33- def writeObject [T ](t : T ) = {
34- val msg = t.asInstanceOf [RoutingTableMessage ]
35- writeVarLong(msg.vid, optimizePositive = false )
36- writeUnsignedVarInt(msg.pid)
37- // TODO: Write only the bottom two bits of msg.position
38- s.write(msg.position)
39- this
32+ override def serializeStream (s : OutputStream ): SerializationStream =
33+ new ShuffleSerializationStream (s) {
34+ def writeObject [T ](t : T ): SerializationStream = {
35+ val msg = t.asInstanceOf [RoutingTableMessage ]
36+ writeVarLong(msg.vid, optimizePositive = false )
37+ writeUnsignedVarInt(msg.pid)
38+ // TODO: Write only the bottom two bits of msg.position
39+ s.write(msg.position)
40+ this
41+ }
4042 }
41- }
4243
43- override def deserializeStream (s : InputStream ) = new ShuffleDeserializationStream (s) {
44- override def readObject [T ](): T = {
45- val a = readVarLong(optimizePositive = false )
46- val b = readUnsignedVarInt()
47- val c = s.read()
48- if (c == - 1 ) throw new EOFException
49- new RoutingTableMessage (a, b, c.toByte).asInstanceOf [T ]
44+ override def deserializeStream (s : InputStream ): DeserializationStream =
45+ new ShuffleDeserializationStream (s) {
46+ override def readObject [T ](): T = {
47+ val a = readVarLong(optimizePositive = false )
48+ val b = readUnsignedVarInt()
49+ val c = s.read()
50+ if (c == - 1 ) throw new EOFException
51+ new RoutingTableMessage (a, b, c.toByte).asInstanceOf [T ]
52+ }
5053 }
51- }
5254 }
5355}
5456
0 commit comments