@@ -78,27 +78,27 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
78
78
maxVec,
79
79
minVec)
80
80
},
81
- combOp = (lhs, rhs ) => (lhs, rhs ) match {
81
+ combOp = (c, v ) => (c, v ) match {
82
82
case (
83
- (lhsMean, lhsM2n, lhsCnt, lhsNNZ, lhsMax, lhsMin ),
84
- (rhsMean, rhsM2n, rhsCnt, rhsNNZ, rhsMax, rhsMin )) =>
85
- val totalCnt = lhsCnt + rhsCnt
86
- val deltaMean = rhsMean - lhsMean
87
- lhsMean :*= (lhsCnt / totalCnt)
88
- axpy(rhsCnt / totalCnt, rhsMean, lhsMean )
89
- val totalMean = lhsMean
83
+ (mean1, m2n1, cnt1, nnz1, max1, min1 ),
84
+ (mean2, m2n2, cnt2, nnz2, max2, min2 )) =>
85
+ val totalCnt = cnt1 + cnt2
86
+ val deltaMean = mean2 - mean1
87
+ mean1 :*= (cnt1 / totalCnt)
88
+ axpy(cnt2 / totalCnt, mean2, mean1 )
89
+ val totalMean = mean1
90
90
deltaMean :*= deltaMean
91
- axpy(lhsCnt * rhsCnt / totalCnt, deltaMean, lhsM2n )
92
- axpy(1.0 , rhsM2n, lhsM2n )
93
- val totalM2n = lhsM2n
94
- rhsMax .activeIterator.foreach { case (id, value) =>
95
- if (lhsMax (id) < value) lhsMax (id) = value
91
+ axpy(cnt1 * cnt2 / totalCnt, deltaMean, m2n1 )
92
+ axpy(1.0 , m2n2, m2n1 )
93
+ val totalM2n = m2n1
94
+ max2 .activeIterator.foreach { case (id, value) =>
95
+ if (max1 (id) < value) max1 (id) = value
96
96
}
97
- rhsMin .activeIterator.foreach { case (id, value) =>
98
- if (lhsMin (id) > value) lhsMin (id) = value
97
+ min2 .activeIterator.foreach { case (id, value) =>
98
+ if (min1 (id) > value) min1 (id) = value
99
99
}
100
- axpy(1.0 , rhsNNZ, lhsNNZ )
101
- (totalMean, totalM2n, totalCnt, lhsNNZ, lhsMax, lhsMin )
100
+ axpy(1.0 , nnz2, nnz1 )
101
+ (totalMean, totalM2n, totalCnt, nnz1, max1, min1 )
102
102
}
103
103
)
104
104
0 commit comments