Skip to content

Commit 48c12c2

Browse files
committed
Add Java version and additional doc
1 parent e5381cd commit 48c12c2

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,32 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
758758
rdd.saveAsHadoopDataset(conf)
759759
}
760760

761+
/**
762+
* Repartition the RDD according to the given partitioner and, within each resulting partition,
763+
* sort records by their keys.
764+
*
765+
* This is more efficient than calling `repartition` and then sorting within each partition
766+
* because it can push the sorting down into the shuffle machinery.
767+
*/
768+
def repartitionAndSortWithinPartition(partitioner: Partitioner): JavaPairRDD[K, V] = {
769+
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
770+
repartitionAndSortWithinPartition(partitioner, comp)
771+
}
772+
773+
/**
774+
* Repartition the RDD according to the given partitioner and, within each resulting partition,
775+
* sort records by their keys.
776+
*
777+
* This is more efficient than calling `repartition` and then sorting within each partition
778+
* because it can push the sorting down into the shuffle machinery.
779+
*/
780+
def repartitionAndSortWithinPartition(partitioner: Partitioner, comp: Comparator)
781+
: JavaPairRDD[K, V] = {
782+
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
783+
fromRDD(
784+
new OrderedRDDFunctions[K, V, (K, V)](rdd).repartitionAndSortWithinPartition(partitioner))
785+
}
786+
761787
/**
762788
* Sort the RDD by key, so that each partition contains a sorted range of the elements in
763789
* ascending order. Calling `collect` or `save` on the resulting RDD will return or output an

core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,13 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
6868
/**
6969
* Repartition the RDD according to the given partitioner and, within each resulting partition,
7070
* sort records by their keys.
71+
*
72+
* This is more efficient than calling `repartition` and then sorting within each partition
73+
* because it can push the sorting down into the shuffle machinery.
7174
*/
72-
def repartitionAndSortWithinPartition(partitioner: Partitioner, ascending: Boolean = true)
75+
def repartitionAndSortWithinPartition(partitioner: Partitioner)
7376
: RDD[(K, V)] = {
74-
new ShuffledRDD[K, V, V](self, partitioner)
75-
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
77+
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
7678
}
7779

7880
}

0 commit comments

Comments
 (0)