Skip to content

Commit 19ccf20

Browse files
committed
Revert "SPARK-1786: Edge Partition Serialization"
This reverts commit 09e7aa4.
1 parent 09e7aa4 commit 19ccf20

File tree

11 files changed

+23
-44
lines changed

11 files changed

+23
-44
lines changed

graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ import org.apache.spark.util.BoundedPriorityQueue
2424
import org.apache.spark.util.collection.BitSet
2525

2626
import org.apache.spark.graphx.impl._
27-
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
28-
import org.apache.spark.util.collection.OpenHashSet
29-
3027

3128
/**
3229
* Registers GraphX classes with Kryo for improved performance.
@@ -46,8 +43,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
4643
kryo.register(classOf[PartitionStrategy])
4744
kryo.register(classOf[BoundedPriorityQueue[Object]])
4845
kryo.register(classOf[EdgeDirection])
49-
kryo.register(classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]])
50-
kryo.register(classOf[OpenHashSet[Int]])
51-
kryo.register(classOf[OpenHashSet[Long]])
46+
47+
// This avoids a large number of hash table lookups.
48+
kryo.setReferences(false)
5249
}
5350
}

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
2020
import scala.reflect.{classTag, ClassTag}
2121

2222
import org.apache.spark.graphx._
23-
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
23+
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
2424

2525
/**
2626
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
@@ -42,12 +42,12 @@ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
4242
private[graphx]
4343
class EdgePartition[
4444
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
45-
val srcIds: Array[VertexId] = null,
46-
val dstIds: Array[VertexId] = null,
47-
val data: Array[ED] = null,
48-
val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
49-
val vertices: VertexPartition[VD] = null,
50-
val activeSet: Option[VertexSet] = None
45+
@transient val srcIds: Array[VertexId],
46+
@transient val dstIds: Array[VertexId],
47+
@transient val data: Array[ED],
48+
@transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
49+
@transient val vertices: VertexPartition[VD],
50+
@transient val activeSet: Option[VertexSet] = None
5151
) extends Serializable {
5252

5353
/** Return a new `EdgePartition` with the specified edge data. */

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.util.Sorting
2323
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
2424

2525
import org.apache.spark.graphx._
26-
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
26+
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
2727

2828
private[graphx]
2929
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
@@ -41,7 +41,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
4141
val srcIds = new Array[VertexId](edgeArray.size)
4242
val dstIds = new Array[VertexId](edgeArray.size)
4343
val data = new Array[ED](edgeArray.size)
44-
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
44+
val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
4545
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
4646
// adding them to the index
4747
if (edgeArray.length > 0) {

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
2020
import scala.reflect.ClassTag
2121

2222
import org.apache.spark.graphx._
23-
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
23+
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
2424

2525
/**
2626
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in

graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.rdd.ShuffledRDD
2525
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
2626

2727
import org.apache.spark.graphx._
28-
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
28+
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
2929

3030
/**
3131
* A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
@@ -69,7 +69,7 @@ object RoutingTablePartition {
6969
: Iterator[RoutingTableMessage] = {
7070
// Determine which positions each vertex id appears in using a map where the low 2 bits
7171
// represent src and dst
72-
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
72+
val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
7373
edgePartition.srcIds.iterator.foreach { srcId =>
7474
map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
7575
}

graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
2222
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
2323

2424
import org.apache.spark.graphx._
25-
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
25+
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
2626

2727
/** Stores vertex attributes to ship to an edge partition. */
2828
private[graphx]

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
2222
import org.apache.spark.util.collection.BitSet
2323

2424
import org.apache.spark.graphx._
25-
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
25+
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
2626

2727
private[graphx] object VertexPartition {
2828
/** Construct a `VertexPartition` from the given vertices. */

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
2323
import org.apache.spark.util.collection.BitSet
2424

2525
import org.apache.spark.graphx._
26-
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
26+
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
2727

2828
private[graphx] object VertexPartitionBase {
2929
/**
@@ -32,7 +32,7 @@ private[graphx] object VertexPartitionBase {
3232
*/
3333
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
3434
: (VertexIdToIndexMap, Array[VD], BitSet) = {
35-
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
35+
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
3636
iter.foreach { pair =>
3737
map(pair._1) = pair._2
3838
}
@@ -45,7 +45,7 @@ private[graphx] object VertexPartitionBase {
4545
*/
4646
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
4747
: (VertexIdToIndexMap, Array[VD], BitSet) = {
48-
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
48+
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
4949
iter.foreach { pair =>
5050
map.setMerge(pair._1, pair._2, mergeFunc)
5151
}

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.Logging
2525
import org.apache.spark.util.collection.BitSet
2626

2727
import org.apache.spark.graphx._
28-
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
28+
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
2929

3030
/**
3131
* An class containing additional operations for subclasses of VertexPartitionBase that provide
@@ -224,7 +224,7 @@ private[graphx] abstract class VertexPartitionBaseOps
224224
* Construct a new VertexPartition whose index contains only the vertices in the mask.
225225
*/
226226
def reindex(): Self[VD] = {
227-
val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
227+
val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
228228
val arbitraryMerge = (a: VD, b: VD) => a
229229
for ((k, v) <- self.iterator) {
230230
hashMap.setMerge(k, v, arbitraryMerge)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.reflect._
2929
* Under the hood, it uses our OpenHashSet implementation.
3030
*/
3131
private[graphx]
32-
class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
32+
class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
3333
@specialized(Long, Int, Double) V: ClassTag](
3434
val keySet: OpenHashSet[K], var _values: Array[V])
3535
extends Iterable[(K, V)]

graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@ import scala.util.Random
2222

2323
import org.scalatest.FunSuite
2424

25-
import org.apache.spark.SparkConf
26-
import org.apache.spark.serializer.KryoSerializer
27-
2825
import org.apache.spark.graphx._
2926

3027
class EdgePartitionSuite extends FunSuite {
@@ -123,19 +120,4 @@ class EdgePartitionSuite extends FunSuite {
123120
assert(!ep.isActive(-1))
124121
assert(ep.numActives == Some(2))
125122
}
126-
127-
test("Kryo serialization") {
128-
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
129-
val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
130-
val conf = new SparkConf()
131-
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
132-
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
133-
val s = new KryoSerializer(conf).newInstance()
134-
val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
135-
assert(aSer.srcIds.toList === a.srcIds.toList)
136-
assert(aSer.dstIds.toList === a.dstIds.toList)
137-
assert(aSer.data.toList === a.data.toList)
138-
assert(aSer.index != null)
139-
assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
140-
}
141123
}

0 commit comments

Comments
 (0)