@@ -20,7 +20,6 @@ import breeze.linalg.{Vector => BV}
20
20
21
21
import org .apache .spark .mllib .linalg .{Vector , Vectors }
22
22
import org .apache .spark .rdd .RDD
23
- import breeze .linalg .axpy
24
23
25
24
case class VectorRDDStatisticalSummary (
26
25
mean : Vector ,
@@ -35,8 +34,8 @@ private case class VectorRDDStatisticalRing(
35
34
fakeM2n : BV [Double ],
36
35
totalCnt : Double ,
37
36
nnz : BV [Double ],
38
- max : BV [Double ],
39
- min : BV [Double ])
37
+ fakeMax : BV [Double ],
38
+ fakeMin : BV [Double ])
40
39
41
40
/**
42
41
* Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector ]] through an
@@ -58,7 +57,9 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
58
57
* with the size of Vector as input parameter.
59
58
*/
60
59
61
- private def seqOp (aggregator : VectorRDDStatisticalRing , currData : BV [Double ]): VectorRDDStatisticalRing = {
60
+ private def seqOp (
61
+ aggregator : VectorRDDStatisticalRing ,
62
+ currData : BV [Double ]): VectorRDDStatisticalRing = {
62
63
aggregator match {
63
64
case VectorRDDStatisticalRing (prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
64
65
currData.activeIterator.foreach {
@@ -73,7 +74,8 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
73
74
nnzVec(id) += 1.0
74
75
}
75
76
76
- VectorRDDStatisticalRing (prevMean,
77
+ VectorRDDStatisticalRing (
78
+ prevMean,
77
79
prevM2n,
78
80
cnt + 1.0 ,
79
81
nnzVec,
@@ -82,7 +84,9 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
82
84
}
83
85
}
84
86
85
- private def combOp (statistics1 : VectorRDDStatisticalRing , statistics2 : VectorRDDStatisticalRing ): VectorRDDStatisticalRing = {
87
+ private def combOp (
88
+ statistics1 : VectorRDDStatisticalRing ,
89
+ statistics2 : VectorRDDStatisticalRing ): VectorRDDStatisticalRing = {
86
90
(statistics1, statistics2) match {
87
91
case (VectorRDDStatisticalRing (mean1, m2n1, cnt1, nnz1, max1, min1),
88
92
VectorRDDStatisticalRing (mean2, m2n2, cnt2, nnz2, max2, min2)) =>
@@ -111,26 +115,34 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
111
115
BV .fill(size)(Double .MinValue ),
112
116
BV .fill(size)(Double .MaxValue ))
113
117
114
- val breezeVectors = self.collect().map(_.toBreeze)
115
- val VectorRDDStatisticalRing (fakeMean, fakeM2n, totalCnt, nnz, max, min) = breezeVectors.aggregate(zeroValue)(seqOp, combOp)
118
+ val breezeVectors = self.map(_.toBreeze)
119
+ val VectorRDDStatisticalRing (fakeMean, fakeM2n, totalCnt, nnz, fakeMax, fakeMin) =
120
+ breezeVectors.aggregate(zeroValue)(seqOp, combOp)
116
121
117
122
// solve real mean
118
123
val realMean = fakeMean :* nnz :/ totalCnt
119
124
// solve real variance
120
125
val deltaMean = fakeMean :- 0.0
121
126
val realVar = fakeM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)
122
- // max, min process, in case of a column is all zero.
123
- // max :+= 0.0
124
- // min :+= 0.0
125
-
127
+ // max, min
128
+ val max = Vectors .sparse(size, fakeMax.activeIterator.map { case (id, value) =>
129
+ if ((value == Double .MinValue ) && (realMean(id) != Double .MinValue )) (id, 0.0 )
130
+ else (id, value)
131
+ }.toSeq)
132
+ val min = Vectors .sparse(size, fakeMin.activeIterator.map { case (id, value) =>
133
+ if ((value == Double .MaxValue ) && (realMean(id) != Double .MaxValue )) (id, 0.0 )
134
+ else (id, value)
135
+ }.toSeq)
136
+
137
+ // get variance
126
138
realVar :/= totalCnt
127
139
128
140
VectorRDDStatisticalSummary (
129
141
Vectors .fromBreeze(realMean),
130
142
Vectors .fromBreeze(realVar),
131
143
totalCnt.toLong,
132
144
Vectors .fromBreeze(nnz),
133
- Vectors .fromBreeze( max) ,
134
- Vectors .fromBreeze( min) )
145
+ max,
146
+ min)
135
147
}
136
148
}
0 commit comments