Skip to content

Commit a5c13b0

Browse files
committed
Added min and max to Scala and Java RDD, added min and max to StatCounter
1 parent ed67136 commit a5c13b0

File tree

6 files changed

+33
-2
lines changed

6 files changed

+33
-2
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
477477
new java.util.ArrayList(arr)
478478
}
479479

480+
def max(comp: Comparator[T]): T = {
481+
import scala.collection.JavaConversions._
482+
rdd.max()(Ordering.comparatorToOrdering(comp))
483+
}
484+
485+
def min(comp: Comparator[T]): T = {
486+
import scala.collection.JavaConversions._
487+
rdd.min()(Ordering.comparatorToOrdering(comp))
488+
}
489+
480490
/**
481491
* Returns the first K elements from this RDD using the
482492
* natural ordering for T while maintain the order.

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -958,6 +958,10 @@ abstract class RDD[T: ClassTag](
958958
*/
959959
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
960960

961+
def max()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.max(x,y)}
962+
963+
def min()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.min(x,y)}
964+
961965
/**
962966
* Save this RDD as a text file, using string representations of elements.
963967
*/

core/src/main/scala/org/apache/spark/util/StatCounter.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
2929
private var n: Long = 0 // Running count of our values
3030
private var mu: Double = 0 // Running mean of our values
3131
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)
32+
private var max_v: Double = 0 // Running max of our values
33+
private var min_v: Double = 0 // Running min of our values
3234

3335
merge(values)
3436

@@ -41,6 +43,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
4143
n += 1
4244
mu += delta / n
4345
m2 += delta * (value - mu)
46+
max_v = math.max(max_v, value)
47+
min_v = math.min(min_v, value)
4448
this
4549
}
4650

@@ -58,7 +62,9 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
5862
if (n == 0) {
5963
mu = other.mu
6064
m2 = other.m2
61-
n = other.n
65+
n = other.n
66+
max_v = other.max_v
67+
min_v = other.min_v
6268
} else if (other.n != 0) {
6369
val delta = other.mu - mu
6470
if (other.n * 10 < n) {
@@ -70,6 +76,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
7076
}
7177
m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
7278
n += other.n
79+
max_v = math.max(max_v, other.max_v)
80+
min_v = math.min(min_v, other.min_v)
7381
}
7482
this
7583
}
@@ -81,6 +89,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
8189
other.n = n
8290
other.mu = mu
8391
other.m2 = m2
92+
other.max_v = max_v
93+
other.min_v = min_v
8494
other
8595
}
8696

@@ -90,6 +100,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
90100

91101
def sum: Double = n * mu
92102

103+
def max: Double = max_v
104+
105+
def min: Double = min_v
106+
93107
/** Return the variance of the values. */
94108
def variance: Double = {
95109
if (n == 0) {

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
171171
assert(abs(6.0/2 - rdd.mean) < 0.01)
172172
assert(abs(1.0 - rdd.variance) < 0.01)
173173
assert(abs(1.0 - rdd.stdev) < 0.01)
174+
assert(abs(4.0 - stats.max) === 0)
175+
assert(abs(-1.0 - stats.max) === 0)
174176

175177
// Add other tests here for classes that should be able to handle empty partitions correctly
176178
}

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
4747
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
4848
assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
4949
assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
50+
assert(nums.max() === 4)
51+
assert(nums.min() === 1)
5052
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
5153
assert(partitionSums.collect().toList === List(3, 7))
5254

python/pyspark/rdd.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import sys
2525
import shlex
2626
import traceback
27-
from bisect import bisect_right
2827
from subprocess import Popen, PIPE
2928
from tempfile import NamedTemporaryFile
3029
from threading import Thread

0 commit comments

Comments
 (0)