Skip to content

Commit

Permalink
Merge pull request alteryx#103 from JoshRosen/unpersist-fix
Browse files Browse the repository at this point in the history
Add unpersist() to JavaDoubleRDD and JavaPairRDD.

This fixes a minor inconsistency where [unpersist() was only available on JavaRDD](https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201310.mbox/%3CCE8D8748.68C0%25YannLuppo%40livenation.com%3E) and not JavaPairRDD / JavaDoubleRDD.   I also added support for the new optional `blocking` argument added in 0.8.

Please merge this into branch-0.8, too.
  • Loading branch information
rxin committed Oct 24, 2013
2 parents dadfc63 + 210858a commit a098438
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 0 deletions.
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
*/
def persist(newLevel: StorageLevel): JavaDoubleRDD = fromRDD(srdd.persist(newLevel))

/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
* This method blocks until all blocks are deleted.
*/
def unpersist(): JavaDoubleRDD = fromRDD(srdd.unpersist())

/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
*
* @param blocking Whether to block until all blocks are deleted.
*/
def unpersist(blocking: Boolean): JavaDoubleRDD = fromRDD(srdd.unpersist(blocking))

// first() has to be overriden here in order for its return type to be Double instead of Object.
override def first(): Double = srdd.first()

Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def persist(newLevel: StorageLevel): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.persist(newLevel))

/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
* This method blocks until all blocks are deleted.
*/
def unpersist(): JavaPairRDD[K, V] = wrapRDD(rdd.unpersist())

/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
*
* @param blocking Whether to block until all blocks are deleted.
*/
def unpersist(blocking: Boolean): JavaPairRDD[K, V] = wrapRDD(rdd.unpersist(blocking))

// Transformations (return a new RDD)

/**
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,17 @@ JavaRDDLike[T, JavaRDD[T]] {

/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
* This method blocks until all blocks are deleted.
*/
def unpersist(): JavaRDD[T] = wrapRDD(rdd.unpersist())

/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
*
* @param blocking Whether to block until all blocks are deleted.
*/
def unpersist(blocking: Boolean): JavaRDD[T] = wrapRDD(rdd.unpersist(blocking))

// Transformations (return a new RDD)

/**
Expand Down

0 comments on commit a098438

Please sign in to comment.