Skip to content

Commit

Permalink
[SPARK-5461] [graphx] Add isCheckpointed, getCheckpointedFiles method…
Browse files Browse the repository at this point in the history
…s to Graph

Added the 2 methods to Graph and GraphImpl.  Both make calls to the underlying vertex and edge RDDs.

This is needed for another PR (for LDA): [apache#4047]

Notes:
* getCheckpointedFiles is plural and returns a Seq[String] instead of an Option[String].
* I attempted to test to make sure the methods returned the correct values after checkpointing.  It did not work; I guess that checkpointing does not occur quickly enough?  I noticed that there are not checkpointing tests for RDDs; is it just hard to test well?

CC: rxin

CC: mengxr  (since related to LDA)

Author: Joseph K. Bradley <joseph@databricks.com>

Closes apache#4253 from jkbradley/graphx-checkpoint and squashes the following commits:

b680148 [Joseph K. Bradley] added class tag to firstParent call in VertexRDDImpl.isCheckpointed, though not needed to compile
250810e [Joseph K. Bradley] In EdgeRDDImple, VertexRDDImpl, added transient back to partitionsRDD, and made isCheckpointed check firstParent instead of partitionsRDD
695b7a3 [Joseph K. Bradley] changed partitionsRDD in EdgeRDDImpl, VertexRDDImpl to be non-transient
cc00767 [Joseph K. Bradley] added overrides for isCheckpointed, getCheckpointFile in EdgeRDDImpl, VertexRDDImpl. The corresponding Graph methods now work.
188665f [Joseph K. Bradley] improved documentation
235738c [Joseph K. Bradley] Added isCheckpointed and getCheckpointFiles to Graph, GraphImpl
  • Loading branch information
jkbradley authored and mengxr committed Feb 2, 2015
1 parent 5a55261 commit 842d000
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 2 deletions.
12 changes: 12 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/
def checkpoint(): Unit

/**
* Return whether this Graph has been checkpointed or not.
* This returns true iff both the vertices RDD and edges RDD have been checkpointed.
*/
def isCheckpointed: Boolean

/**
* Gets the name of the files to which this Graph was checkpointed.
* (The vertices RDD and edges RDD are checkpointed separately.)
*/
def getCheckpointFiles: Seq[String]

/**
* Uncaches both vertices and edges of this graph. This is useful in iterative algorithms that
* build a new graph in each iteration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,15 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
override def checkpoint() = {
partitionsRDD.checkpoint()
}


override def isCheckpointed: Boolean = {
firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
}

override def getCheckpointFile: Option[String] = {
partitionsRDD.getCheckpointFile
}

/** The number of edges in the RDD. */
override def count(): Long = {
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
Expand Down
11 changes: 11 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
replicatedVertexView.edges.checkpoint()
}

override def isCheckpointed: Boolean = {
vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed
}

override def getCheckpointFiles: Seq[String] = {
Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap {
case Some(path) => Seq(path)
case None => Seq()
}
}

override def unpersist(blocking: Boolean = true): Graph[VD, ED] = {
unpersistVertices(blocking)
replicatedVertexView.edges.unpersist(blocking)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,15 @@ class VertexRDDImpl[VD] private[graphx] (
override def checkpoint() = {
partitionsRDD.checkpoint()
}


override def isCheckpointed: Boolean = {
firstParent[ShippableVertexPartition[VD]].isCheckpointed
}

override def getCheckpointFile: Option[String] = {
partitionsRDD.getCheckpointFile
}

/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
val rdd = sc.parallelize(ring)
val graph = Graph.fromEdges(rdd, 1.0F)
assert(!graph.isCheckpointed)
assert(graph.getCheckpointFiles.size === 0)
graph.checkpoint()
graph.edges.map(_.attr).count()
graph.vertices.map(_._2).count()
Expand All @@ -383,6 +385,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val verticesDependencies = graph.vertices.partitionsRDD.dependencies
assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
assert(graph.isCheckpointed)
assert(graph.getCheckpointFiles.size === 2)
}
}

Expand Down
6 changes: 6 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ object MimaExcludes {
// SPARK-5315 Spark Streaming Java API returns Scala DStream
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
) ++ Seq(
// SPARK-5461 Graph should have isCheckpointed, getCheckpointFiles methods
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.graphx.Graph.getCheckpointFiles"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.graphx.Graph.isCheckpointed")
)

case v if v.startsWith("1.2") =>
Expand Down

0 comments on commit 842d000

Please sign in to comment.