17
17
18
18
package org .apache .spark .mllib .rdd
19
19
20
- import breeze .linalg .{axpy , Vector => BV }
20
+ import breeze .linalg .{Vector => BV , DenseVector => BDV }
21
21
22
22
import org .apache .spark .mllib .linalg .{Vectors , Vector }
23
23
import org .apache .spark .rdd .RDD
@@ -29,60 +29,59 @@ import org.apache.spark.rdd.RDD
29
29
trait VectorRDDStatisticalSummary {
30
30
def mean : Vector
31
31
def variance : Vector
32
- def totalCount : Long
32
+ def count : Long
33
33
def numNonZeros : Vector
34
34
def max : Vector
35
35
def min : Vector
36
36
}
37
37
38
38
/**
39
39
* Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary ]]
40
- * together with add() and merge() function.
40
+ * together with add() and merge() function. Online variance solution used in add() function, while
41
+ * parallel variance solution used in merge() function. Reference here:
42
+ * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki ]]. Solution here
43
+ * ignoring the zero elements when calling add() and merge(), for decreasing the O(n) algorithm to
44
+ * O(nnz). Real variance is computed here after we get other statistics, simply by another parallel
45
+ * combination process.
41
46
*/
42
- private class Aggregator (
43
- val currMean : BV [Double ],
44
- val currM2n : BV [Double ],
47
+ private class VectorRDDStatisticsAggregator (
48
+ val currMean : BDV [Double ],
49
+ val currM2n : BDV [Double ],
45
50
var totalCnt : Double ,
46
- val nnz : BV [Double ],
47
- val currMax : BV [Double ],
48
- val currMin : BV [Double ]) extends VectorRDDStatisticalSummary with Serializable {
51
+ val nnz : BDV [Double ],
52
+ val currMax : BDV [Double ],
53
+ val currMin : BDV [Double ]) extends VectorRDDStatisticalSummary with Serializable {
49
54
50
55
// lazy val is used for computing only once time. Same below.
51
56
override lazy val mean = Vectors .fromBreeze(currMean :* nnz :/ totalCnt)
52
57
53
- // Online variance solution used in add() function, while parallel variance solution used in
54
- // merge() function. Reference here:
55
- // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
56
- // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
57
- // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
58
- // by another parallel combination process.
59
58
override lazy val variance = {
60
59
val deltaMean = currMean
61
60
var i = 0
62
- while (i < currM2n.size) {
63
- currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt- nnz(i)) / totalCnt
61
+ while (i < currM2n.size) {
62
+ currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt
64
63
currM2n(i) /= totalCnt
65
64
i += 1
66
65
}
67
66
Vectors .fromBreeze(currM2n)
68
67
}
69
68
70
- override lazy val totalCount : Long = totalCnt.toLong
69
+ override lazy val count : Long = totalCnt.toLong
71
70
72
71
override lazy val numNonZeros : Vector = Vectors .fromBreeze(nnz)
73
72
74
73
override lazy val max : Vector = {
75
74
nnz.iterator.foreach {
76
75
case (id, count) =>
77
- if ((count == 0.0 ) || ((count < totalCnt) && (currMax(id) < 0.0 ) )) currMax(id) = 0.0
76
+ if ((count < totalCnt) && (currMax(id) < 0.0 )) currMax(id) = 0.0
78
77
}
79
78
Vectors .fromBreeze(currMax)
80
79
}
81
80
82
81
override lazy val min : Vector = {
83
82
nnz.iterator.foreach {
84
83
case (id, count) =>
85
- if ((count == 0.0 ) || ((count < totalCnt) && (currMin(id) > 0.0 ) )) currMin(id) = 0.0
84
+ if ((count < totalCnt) && (currMin(id) > 0.0 )) currMin(id) = 0.0
86
85
}
87
86
Vectors .fromBreeze(currMin)
88
87
}
@@ -92,7 +91,7 @@ private class Aggregator(
92
91
*/
93
92
def add (currData : BV [Double ]): this .type = {
94
93
currData.activeIterator.foreach {
95
- // this case is used for filtering the zero elements if the vector is a dense one .
94
+ // this case is used for filtering the zero elements if the vector.
96
95
case (id, 0.0 ) =>
97
96
case (id, value) =>
98
97
if (currMax(id) < value) currMax(id) = value
@@ -112,7 +111,7 @@ private class Aggregator(
112
111
/**
113
112
* Combine function used for combining intermediate results together from every worker.
114
113
*/
115
- def merge (other : Aggregator ): this .type = {
114
+ def merge (other : VectorRDDStatisticsAggregator ): this .type = {
116
115
117
116
totalCnt += other.totalCnt
118
117
@@ -145,7 +144,7 @@ private class Aggregator(
145
144
if (currMin(id) > value) currMin(id) = value
146
145
}
147
146
148
- axpy( 1.0 , other.nnz, nnz)
147
+ nnz += other.nnz
149
148
this
150
149
}
151
150
}
@@ -160,18 +159,18 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
160
159
/**
161
160
* Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
162
161
*/
163
- def summarizeStatistics (): VectorRDDStatisticalSummary = {
164
- val size = self.take( 1 ).head .size
162
+ def computeSummaryStatistics (): VectorRDDStatisticalSummary = {
163
+ val size = self.first() .size
165
164
166
- val zeroValue = new Aggregator (
167
- BV .zeros[Double ](size),
168
- BV .zeros[Double ](size),
165
+ val zeroValue = new VectorRDDStatisticsAggregator (
166
+ BDV .zeros[Double ](size),
167
+ BDV .zeros[Double ](size),
169
168
0.0 ,
170
- BV .zeros[Double ](size),
171
- BV .fill(size)(Double .MinValue ),
172
- BV .fill(size)(Double .MaxValue ))
169
+ BDV .zeros[Double ](size),
170
+ BDV .fill(size)(Double .MinValue ),
171
+ BDV .fill(size)(Double .MaxValue ))
173
172
174
- self.map(_.toBreeze).aggregate[Aggregator ](zeroValue)(
173
+ self.map(_.toBreeze).aggregate[VectorRDDStatisticsAggregator ](zeroValue)(
175
174
(aggregator, data) => aggregator.add(data),
176
175
(aggregator1, aggregator2) => aggregator1.merge(aggregator2)
177
176
)
0 commit comments