Skip to content

Commit 01745ee

Browse files
committed
fix deletion error
1 parent cc97dca commit 01745ee

File tree

2 files changed

+162
-68
lines changed

2 files changed

+162
-68
lines changed

mllib/src/main/scala/org/apache/spark/mllib/MLContext.scala

Lines changed: 0 additions & 68 deletions
This file was deleted.
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.util
19+
20+
import org.apache.spark.SparkContext
21+
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.SparkContext._
23+
24+
import org.jblas.DoubleMatrix
25+
26+
import org.apache.spark.mllib.regression.LabeledPoint
27+
28+
import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance}
29+
30+
/**
31+
* Helper methods to load, save and pre-process data used in ML Lib.
32+
*/
33+
object MLUtils {
34+
35+
private[util] lazy val EPSILON = {
36+
var eps = 1.0
37+
while ((1.0 + (eps / 2.0)) != 1.0) {
38+
eps /= 2.0
39+
}
40+
eps
41+
}
42+
43+
/**
44+
* Load labeled data from a file. The data format used here is
45+
* <L>, <f1> <f2> ...
46+
* where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
47+
*
48+
* @param sc SparkContext
49+
* @param dir Directory to the input data files.
50+
* @return An RDD of LabeledPoint. Each labeled point has two elements: the first element is
51+
* the label, and the second element represents the feature values (an array of Double).
52+
*/
53+
def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = {
54+
sc.textFile(dir).map { line =>
55+
val parts = line.split(',')
56+
val label = parts(0).toDouble
57+
val features = parts(1).trim().split(' ').map(_.toDouble)
58+
LabeledPoint(label, features)
59+
}
60+
}
61+
62+
/**
63+
* Save labeled data to a file. The data format used here is
64+
* <L>, <f1> <f2> ...
65+
* where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
66+
*
67+
* @param data An RDD of LabeledPoints containing data to be saved.
68+
* @param dir Directory to save the data.
69+
*/
70+
def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
71+
val dataStr = data.map(x => x.label + "," + x.features.mkString(" "))
72+
dataStr.saveAsTextFile(dir)
73+
}
74+
75+
/**
76+
* Utility function to compute mean and standard deviation on a given dataset.
77+
*
78+
* @param data - input data set whose statistics are computed
79+
* @param nfeatures - number of features
80+
* @param nexamples - number of examples in input dataset
81+
*
82+
* @return (yMean, xColMean, xColSd) - Tuple consisting of
83+
* yMean - mean of the labels
84+
* xColMean - Row vector with mean for every column (or feature) of the input data
85+
* xColSd - Row vector standard deviation for every column (or feature) of the input data.
86+
*/
87+
def computeStats(data: RDD[LabeledPoint], nfeatures: Int, nexamples: Long):
88+
(Double, DoubleMatrix, DoubleMatrix) = {
89+
val yMean: Double = data.map { labeledPoint => labeledPoint.label }.reduce(_ + _) / nexamples
90+
91+
// NOTE: We shuffle X by column here to compute column sum and sum of squares.
92+
val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { labeledPoint =>
93+
val nCols = labeledPoint.features.length
94+
// Traverse over every column and emit (col, value, value^2)
95+
Iterator.tabulate(nCols) { i =>
96+
(i, (labeledPoint.features(i), labeledPoint.features(i)*labeledPoint.features(i)))
97+
}
98+
}.reduceByKey { case(x1, x2) =>
99+
(x1._1 + x2._1, x1._2 + x2._2)
100+
}
101+
val xColSumsMap = xColSumSq.collectAsMap()
102+
103+
val xColMean = DoubleMatrix.zeros(nfeatures, 1)
104+
val xColSd = DoubleMatrix.zeros(nfeatures, 1)
105+
106+
// Compute mean and unbiased variance using column sums
107+
var col = 0
108+
while (col < nfeatures) {
109+
xColMean.put(col, xColSumsMap(col)._1 / nexamples)
110+
val variance =
111+
(xColSumsMap(col)._2 - (math.pow(xColSumsMap(col)._1, 2) / nexamples)) / nexamples
112+
xColSd.put(col, math.sqrt(variance))
113+
col += 1
114+
}
115+
116+
(yMean, xColMean, xColSd)
117+
}
118+
119+
/**
120+
* Returns the squared Euclidean distance between two vectors. The following formula will be used
121+
* if it does not introduce too much numerical error:
122+
* <pre>
123+
* \|a - b\|_2^2 = \|a\|_2^2 + \|b\|_2^2 - 2 a^T b.
124+
* </pre>
125+
* When both vector norms are given, this is faster than computing the squared distance directly,
126+
* especially when one of the vectors is a sparse vector.
127+
*
128+
* @param v1 the first vector
129+
* @param norm1 the norm of the first vector, non-negative
130+
* @param v2 the second vector
131+
* @param norm2 the norm of the second vector, non-negative
132+
* @param precision desired relative precision for the squared distance
133+
* @return squared distance between v1 and v2 within the specified precision
134+
*/
135+
private[mllib] def fastSquaredDistance(
136+
v1: BV[Double],
137+
norm1: Double,
138+
v2: BV[Double],
139+
norm2: Double,
140+
precision: Double = 1e-6): Double = {
141+
val n = v1.size
142+
require(v2.size == n)
143+
require(norm1 >= 0.0 && norm2 >= 0.0)
144+
val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
145+
val normDiff = norm1 - norm2
146+
var sqDist = 0.0
147+
val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
148+
if (precisionBound1 < precision) {
149+
sqDist = sumSquaredNorm - 2.0 * v1.dot(v2)
150+
} else if (v1.isInstanceOf[BSV[Double]] || v2.isInstanceOf[BSV[Double]]) {
151+
val dot = v1.dot(v2)
152+
sqDist = math.max(sumSquaredNorm - 2.0 * dot, 0.0)
153+
val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dot)) / (sqDist + EPSILON)
154+
if (precisionBound2 > precision) {
155+
sqDist = breezeSquaredDistance(v1, v2)
156+
}
157+
} else {
158+
sqDist = breezeSquaredDistance(v1, v2)
159+
}
160+
sqDist
161+
}
162+
}

0 commit comments

Comments
 (0)