Skip to content

Commit 1a97558

Browse files
committed
added max and min to StatCounter output, updated doc
1 parent a5c13b0 commit 1a97558

File tree

5 files changed

+29
-9
lines changed

5 files changed

+29
-9
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,11 +477,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
477477
new java.util.ArrayList(arr)
478478
}
479479

480+
/**
481+
* Returns the maximum element from this RDD as defined by the specified
482+
* Comparator[T].
483+
* @params comp the comparator that defines ordering
484+
* @return the maximum of the RDD
485+
* */
480486
def max(comp: Comparator[T]): T = {
481487
import scala.collection.JavaConversions._
482488
rdd.max()(Ordering.comparatorToOrdering(comp))
483489
}
484490

491+
/**
492+
* Returns the minimum element from this RDD as defined by the specified
493+
* Comparator[T].
494+
* @params comp the comparator that defines ordering
495+
* @return the minimum of the RDD
496+
* */
485497
def min(comp: Comparator[T]): T = {
486498
import scala.collection.JavaConversions._
487499
rdd.min()(Ordering.comparatorToOrdering(comp))

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -958,9 +958,17 @@ abstract class RDD[T: ClassTag](
958958
*/
959959
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
960960

961-
def max()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.max(x,y)}
961+
/**
962+
* Returns the max of this RDD as defined by the implicit Ordering[T].
963+
* @return the maximum element of the RDD
964+
* */
965+
def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max)
962966

963-
def min()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.min(x,y)}
967+
/**
968+
* Returns the min of this RDD as defined by the implicit Ordering[T].
969+
* @return the minimum element of the RDD
970+
* */
971+
def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min)
964972

965973
/**
966974
* Save this RDD as a text file, using string representations of elements.

core/src/main/scala/org/apache/spark/util/StatCounter.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
2929
private var n: Long = 0 // Running count of our values
3030
private var mu: Double = 0 // Running mean of our values
3131
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)
32-
private var max_v: Double = 0 // Running max of our values
33-
private var min_v: Double = 0 // Running min of our values
32+
private var max_v: Double = Double(-Infinity) // Running max of our values
33+
private var min_v: Double = Double(Infinity) // Running min of our values
3434

3535
merge(values)
3636

@@ -135,7 +135,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
135135
def sampleStdev: Double = math.sqrt(sampleVariance)
136136

137137
override def toString: String = {
138-
"(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev)
138+
"(count: %d, mean: %f, stdev: %f, max: %f, min: $f)".format(count, mean, stdev, max, min)
139139
}
140140
}
141141

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
171171
assert(abs(6.0/2 - rdd.mean) < 0.01)
172172
assert(abs(1.0 - rdd.variance) < 0.01)
173173
assert(abs(1.0 - rdd.stdev) < 0.01)
174-
assert(abs(4.0 - stats.max) === 0)
175-
assert(abs(-1.0 - stats.max) === 0)
174+
assert(stats.max === 4.0)
175+
assert(stats.min === -1.0)
176176

177177
// Add other tests here for classes that should be able to handle empty partitions correctly
178178
}

python/pyspark/rdd.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ def max(self):
543543
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
544544
43.0
545545
"""
546-
return self.stats().max()
546+
return self.reduce(max)
547547

548548
def min(self):
549549
"""
@@ -552,7 +552,7 @@ def min(self):
552552
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
553553
1.0
554554
"""
555-
return self.stats().min()
555+
return self.reduce(min)
556556

557557
def sum(self):
558558
"""

0 commit comments

Comments
 (0)