Skip to content

[SPARK-3649] Remove GraphX custom serializers #2503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.impl.RoutingTablePartition
import org.apache.spark.graphx.impl.ShippableVertexPartition
import org.apache.spark.graphx.impl.VertexAttributeBlock
import org.apache.spark.graphx.impl.RoutingTableMessageRDDFunctions._
import org.apache.spark.graphx.impl.VertexRDDFunctions._

/**
* Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
Expand Down Expand Up @@ -233,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag](
case _ =>
this.withPartitionsRDD[VD3](
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
}
)
Expand Down Expand Up @@ -277,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag](
case _ =>
this.withPartitionsRDD(
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
}
)
Expand All @@ -297,7 +295,7 @@ class VertexRDD[@specialized VD: ClassTag](
*/
def aggregateUsingIndex[VD2: ClassTag](
messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
val shuffled = messages.copartitionWithVertices(this.partitioner.get)
val shuffled = messages.partitionBy(this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
}
Expand Down Expand Up @@ -371,7 +369,7 @@ object VertexRDD {
def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD] = {
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
case Some(p) => vertices
case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.size))
}
val vertexPartitions = vPartitioned.mapPartitions(
iter => Iterator(ShippableVertexPartition(iter)),
Expand Down Expand Up @@ -412,7 +410,7 @@ object VertexRDD {
): VertexRDD[VD] = {
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
case Some(p) => vertices
case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.size))
}
val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {
Expand Down Expand Up @@ -454,7 +452,7 @@ object VertexRDD {
.setName("VertexRDD.createRoutingTables - vid2pid (aggregation)")

val numEdgePartitions = edges.partitions.size
vid2pid.copartitionWithVertices(vertexPartitioner).mapPartitions(
vid2pid.partitionBy(vertexPartitioner).mapPartitions(
iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
preservesPartitioning = true)
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,6 @@ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

import org.apache.spark.graphx.impl.RoutingTablePartition.RoutingTableMessage

private[graphx]
class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
/** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
new ShuffledRDD[VertexId, Int, Int](
self, partitioner).setSerializer(new RoutingTableMessageSerializer)
}
}

private[graphx]
object RoutingTableMessageRDDFunctions {
import scala.language.implicitConversions

implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
new RoutingTableMessageRDDFunctions(rdd)
}
}

private[graphx]
object RoutingTablePartition {
/**
Expand Down
Loading