-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-8341] Significant selector feature transformation #6795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.mllib.feature | ||
|
||
import scala.collection.mutable | ||
|
||
import org.apache.spark.annotation.Experimental | ||
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} | ||
import org.apache.spark.rdd.RDD | ||
|
||
/** | ||
* :: Experimental :: | ||
* Model to extract significant indices from vector. | ||
* | ||
* Significant indices is vector's index that has different value for different vectors. | ||
* | ||
* For example, when you use HashingTF they create big sparse vector, | ||
* and this code convert to smallest vector that don't include same values indices for all vectors. | ||
* | ||
* @param indices array of significant indices. | ||
*/ | ||
@Experimental | ||
class SignificantSelectorModel(val indices: Array[Int]) extends VectorTransformer { | ||
|
||
/** | ||
* Applies transformation on a vector. | ||
* | ||
* @param vector vector to be transformed. | ||
* @return transformed vector. | ||
*/ | ||
override def transform(vector: Vector): Vector = vector match { | ||
case DenseVector(vs) => | ||
Vectors.dense(indices.map(vs)) | ||
|
||
case SparseVector(s, ids, vs) => | ||
var sv_idx = 0 | ||
var new_idx = 0 | ||
val elements = new mutable.ListBuffer[(Int, Double)]() | ||
|
||
for (idx <- indices) { | ||
while (sv_idx < ids.length && ids(sv_idx) < idx) { | ||
sv_idx += 1 | ||
} | ||
if (sv_idx < ids.length && ids(sv_idx) == idx) { | ||
elements += ((new_idx, vs(sv_idx))) | ||
sv_idx += 1 | ||
} | ||
new_idx += 1 | ||
} | ||
|
||
Vectors.sparse(indices.length, elements) | ||
|
||
case v => | ||
throw new IllegalArgumentException("Don't support vector type " + v.getClass) | ||
} | ||
} | ||
|
||
/** | ||
* :: Experimental :: | ||
* Specialized model for equivalent vectors | ||
*/ | ||
@Experimental | ||
class SignificantSelectorEmptyModel extends SignificantSelectorModel(Array[Int]()) { | ||
|
||
val empty_vector = Vectors.dense(Array[Double]()) | ||
|
||
override def transform(vector: Vector): Vector = empty_vector | ||
} | ||
|
||
/** | ||
* :: Experimental :: | ||
* Create Significant selector. | ||
*/ | ||
@Experimental | ||
class SignificantSelector() { | ||
|
||
/** | ||
* Returns a significant vector indices selector. | ||
* | ||
* @param sources an `RDD[Vector]` containing the vectors. | ||
*/ | ||
def fit(sources: RDD[Vector]): SignificantSelectorModel = { | ||
val sources_count = sources.count() | ||
val significant_indices = sources.flatMap { | ||
case DenseVector(vs) => | ||
vs.zipWithIndex | ||
case SparseVector(_, ids, vs) => | ||
vs.zip(ids) | ||
case v => | ||
throw new IllegalArgumentException("Don't support vector type " + v.getClass) | ||
} | ||
.map(e => (e.swap, 1)) | ||
.reduceByKey(_ + _) | ||
.map { case ((idx, value), count) => (idx, (value, count))} | ||
.groupByKey() | ||
.mapValues { e => | ||
val values = e.groupBy(_._1) | ||
val sum = e.map(_._2).sum | ||
|
||
values.size + (if (sum == sources_count || values.contains(0.0)) 0 else 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand this must be >1 for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, this is hack for case when you has RDD what include dense and sparse vector. Sparse vector hasn't got zero elements ( If you have in RDD sparse and dense vector and last one has zero element where sparse vectro hasn't got element significant understand it's different values but isn't it. For example, let's see following code: val vectors = sc.parallelize(List(
Vectors.dense(0.0, 2.0, 3.0, 4.0),
Vectors.dense(0.0, 2.0, 3.0, 4.0),
Vectors.dense(0.0, 2.0, 3.0, 4.0),
Vectors.sparse(4, Seq((1, 3.0), (2, 4.0))),
Vectors.dense(0.0, 3.0, 5.0, 4.0),
Vectors.dense(0.0, 3.0, 7.0, 4.0)
)) first element of each vector is zero for dense and empty for sparse. With out this hack significant induces will have first element, because they has different values (zero and empty), but if you convert sparse vector to dense vector significant induces hasn't got first element. It is clear now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Also, SparseVectors may contain zero elements (e.g. I'm still not clear on what I think that you could make the code better by making the handling of sparse/dense more uniform and explicit so other developers can more easily understand. Some suggestions:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's be clear about sparse vector before we will be on same way about this logic. In my point of view: sparse vector is the vector where missing all zero elements for memory optimization. For example here you can found same definition:
|
||
} | ||
.filter(_._2 > 1) | ||
.keys | ||
.collect() | ||
.sorted | ||
|
||
if (significant_indices.nonEmpty) | ||
new SignificantSelectorModel(significant_indices) | ||
else | ||
new SignificantSelectorEmptyModel() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can just construct a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, because I can't create an empty sparse vector, only dense. Here has two options:
I think first way better, don't I? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer not to add a public class if all it's doing is handling a special case. Perhaps we should allow sparse vectors to be empty as well. @mengxr thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, I can suggest to make both model classes as private[mllib]. Do you agree with this? |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.mllib.feature | ||
|
||
import org.apache.spark.mllib.linalg.Vectors | ||
import org.apache.spark.mllib.util.MLlibTestSparkContext | ||
import org.scalatest.FunSuite | ||
|
||
class SignificantSelectorTest extends FunSuite with MLlibTestSparkContext { | ||
val dv = Vectors.dense(1, 2, 3, 4, 5) | ||
val sv1 = Vectors.sparse(5, Seq((0, 1.0), (1, 2.0), (2, 3.0), (3, 4.0), (4, 5.0))) | ||
val sv2 = Vectors.sparse(5, Seq((2, 3.0))) | ||
|
||
test("same result vector") { | ||
val vectors = sc.parallelize(List( | ||
Vectors.dense(0.0, 1.0, 2.0, 3.0, 4.0), | ||
Vectors.dense(4.0, 5.0, 6.0, 7.0, 8.0) | ||
)) | ||
|
||
val significant = new SignificantSelector().fit(vectors) | ||
assert(significant.transform(dv) == dv) | ||
assert(significant.transform(sv1) == sv1) | ||
assert(significant.transform(sv2) == sv2) | ||
} | ||
|
||
|
||
test("shortest result vector") { | ||
val vectors = sc.parallelize(List( | ||
Vectors.dense(0.0, 2.0, 3.0, 4.0), | ||
Vectors.dense(0.0, 2.0, 3.0, 4.0), | ||
Vectors.dense(0.0, 2.0, 3.0, 4.0), | ||
Vectors.sparse(4, Seq((1, 3.0), (2, 4.0))), | ||
Vectors.dense(0.0, 3.0, 5.0, 4.0), | ||
Vectors.dense(0.0, 3.0, 7.0, 4.0) | ||
)) | ||
|
||
val significant = new SignificantSelector().fit(vectors) | ||
|
||
val significanted_dv = Vectors.dense(2.0, 3.0, 4.0) | ||
val significanted_sv1 = Vectors.sparse(3, Seq((0, 2.0), (1, 3.0), (2, 4.0))) | ||
val significanted_sv2 = Vectors.sparse(3, Seq((1, 3.0))) | ||
|
||
assert(significant.transform(dv) == significanted_dv) | ||
assert(significant.transform(sv1) == significanted_sv1) | ||
assert(significant.transform(sv2) == significanted_sv2) | ||
} | ||
|
||
test("empty result vector") { | ||
val vectors = sc.parallelize(List( | ||
Vectors.dense(0.0, 2.0, 3.0, 4.0), | ||
Vectors.dense(0.0, 2.0, 3.0, 4.0) | ||
)) | ||
|
||
val significant = new SignificantSelector().fit(vectors) | ||
|
||
val empty_vector = Vectors.dense(Array[Double]()) | ||
|
||
assert(significant.transform(dv) == empty_vector) | ||
assert(significant.transform(sv1) == empty_vector) | ||
assert(significant.transform(sv2) == empty_vector) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove this, see line 125.