Skip to content

[SPARK-8341] Significant selector feature transformation #6795

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/mllib-feature-extraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -617,3 +617,43 @@ println("PCA Mean Squared Error = " + MSE_pca)
{% endhighlight %}
</div>
</div>

## Significant Selector
Idea of this transformation it safe reduce big vector that was produced by Hashing TF for example
for reduce requirement of memory for manipulation on them.

This transformation create a model that keep only indices that has different values on fit stage.

### Example
<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD

val hashingTF = new HashingTF
val localDocs: Seq[(Double, Array[String])] = Seq(
(1d, "a a b b b c d".split(" ")),
(0d, "a b c d a b c".split(" ")),
(1d, "c b a c b a a".split(" ")))

val docs = sc.parallelize(localDocs, 2)

val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words))}
// scala> tf.first().features.size
// res4: Int = 1048576

val transformer = new SignificantSelector().fit(tf.map(_.features))

val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features)))
// scala> transformed_tf.first().features.size
// res5: Int = 4

// now you have smallest vector that has same features,
// but request less memory for manipulation on DecisionTree for example
{% endhighlight %}
</div>
</div>

Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.feature

import scala.collection.mutable

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.rdd.RDD

/**
* :: Experimental ::
* Model to extract significant indices from vector.
*
* Significant indices is vector's index that has different value for different vectors.
*
* For example, when you use HashingTF they create big sparse vector,
* and this code convert to smallest vector that don't include same values indices for all vectors.
*
* @param indices array of significant indices.
*/
@Experimental
class SignificantSelectorModel(val indices: Array[Int]) extends VectorTransformer {

/**
* Applies transformation on a vector.
*
* @param vector vector to be transformed.
* @return transformed vector.
*/
override def transform(vector: Vector): Vector = vector match {
case DenseVector(vs) =>
Vectors.dense(indices.map(vs))

case SparseVector(s, ids, vs) =>
var sv_idx = 0
var new_idx = 0
val elements = new mutable.ListBuffer[(Int, Double)]()

for (idx <- indices) {
while (sv_idx < ids.length && ids(sv_idx) < idx) {
sv_idx += 1
}
if (sv_idx < ids.length && ids(sv_idx) == idx) {
elements += ((new_idx, vs(sv_idx)))
sv_idx += 1
}
new_idx += 1
}

Vectors.sparse(indices.length, elements)

case v =>
throw new IllegalArgumentException("Don't support vector type " + v.getClass)
}
}

/**
* :: Experimental ::
* Specialized model for equivalent vectors
*/
@Experimental
class SignificantSelectorEmptyModel extends SignificantSelectorModel(Array[Int]()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this, see line 125.


val empty_vector = Vectors.dense(Array[Double]())

override def transform(vector: Vector): Vector = empty_vector
}

/**
* :: Experimental ::
* Create Significant selector.
*/
@Experimental
class SignificantSelector() {

/**
* Returns a significant vector indices selector.
*
* @param sources an `RDD[Vector]` containing the vectors.
*/
def fit(sources: RDD[Vector]): SignificantSelectorModel = {
val sources_count = sources.count()
val significant_indices = sources.flatMap {
case DenseVector(vs) =>
vs.zipWithIndex
case SparseVector(_, ids, vs) =>
vs.zip(ids)
case v =>
throw new IllegalArgumentException("Don't support vector type " + v.getClass)
}
.map(e => (e.swap, 1))
.reduceByKey(_ + _)
.map { case ((idx, value), count) => (idx, (value, count))}
.groupByKey()
.mapValues { e =>
val values = e.groupBy(_._1)
val sum = e.map(_._2).sum

values.size + (if (sum == sources_count || values.contains(0.0)) 0 else 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this must be >1 for idx to not be filtered, but I'm not too clear on what the if (..) 0 else 1 is doing. Can you add some comments to describe your logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is hack for case when you has RDD what include dense and sparse vector.

Sparse vector hasn't got zero elements (0d) and values.size for sparse vector has count only for different non zero value.

If you have in RDD sparse and dense vector and last one has zero element where sparse vectro hasn't got element significant understand it's different values but isn't it.

For example, let's see following code:

    val vectors = sc.parallelize(List(
      Vectors.dense(0.0, 2.0, 3.0, 4.0),
      Vectors.dense(0.0, 2.0, 3.0, 4.0),
      Vectors.dense(0.0, 2.0, 3.0, 4.0),
      Vectors.sparse(4, Seq((1, 3.0), (2, 4.0))),
      Vectors.dense(0.0, 3.0, 5.0, 4.0),
      Vectors.dense(0.0, 3.0, 7.0, 4.0)
    ))

first element of each vector is zero for dense and empty for sparse. With out this hack significant induces will have first element, because they has different values (zero and empty), but if you convert sparse vector to dense vector significant induces hasn't got first element.

It is clear now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparseVector#size should give you the total size of the sparse vector, while SparseVector#numNonzeros gives you the number of nonzero values.

Also, SparseVectors may contain zero elements (e.g. Vectors.sparse(1, Seq((0, 0.0)))); it's just that elements which are not active (in values) are assumed to be zero.

I'm still not clear on what (sum == sources_count || values.contains(0.0)) is testing: the first is true if all the vectors in the RDD were dense and the second is true if any of the vectors in the RDD were dense or if a 0.0 was present in a sparse vector. What are you trying to test here?

I think that you could make the code better by making the handling of sparse/dense more uniform and explicit so other developers can more easily understand. Some suggestions:

  • Break up the chained transformations into some intermediate variables with more descriptive names
  • Add comments to describe what's happening at important parts of the code
  • Converting all vectors to some uniform type so the logic is explicit and the code is more uniform. If everything is converted to dense, then sum == sources_count will always be true. Right now the per-case logic of handling dense and sparse is buried in L115 and is not immediately apparent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be clear about sparse vector before we will be on same way about this logic.

In my point of view: sparse vector is the vector where missing all zero elements for memory optimization. For example here you can found same definition:

A vector is a one-dimensional array of elements. The natural C++ implementation of a vector is as a one-dimensional array. However, in many applications, the elements of a vector have mostly zero values. Such a vector is said to be sparse. It is inefficient to use a one-dimensional array to store a sparse vector. It is also inefficient to add elements whose values are zero in forming sums of sparse vectors. Consequently, we should choose a different representation.

}
.filter(_._2 > 1)
.keys
.collect()
.sorted

if (significant_indices.nonEmpty)
new SignificantSelectorModel(significant_indices)
else
new SignificantSelectorEmptyModel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just construct a SignificantSelectorModel(Array()) instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because I can't create an empty sparse vector, only dense.

Here has two options:

  • create different model for empty indices.
  • make check if indices is empty each transform.

I think first way better, don't I?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not to add a public class if all it's doing is handling a special case. Perhaps we should allow sparse vectors to be empty as well. @mengxr thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I can suggest to make both model classes as private[mllib].

Do you agree with this?

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.feature

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.scalatest.FunSuite

class SignificantSelectorTest extends FunSuite with MLlibTestSparkContext {
val dv = Vectors.dense(1, 2, 3, 4, 5)
val sv1 = Vectors.sparse(5, Seq((0, 1.0), (1, 2.0), (2, 3.0), (3, 4.0), (4, 5.0)))
val sv2 = Vectors.sparse(5, Seq((2, 3.0)))

test("same result vector") {
val vectors = sc.parallelize(List(
Vectors.dense(0.0, 1.0, 2.0, 3.0, 4.0),
Vectors.dense(4.0, 5.0, 6.0, 7.0, 8.0)
))

val significant = new SignificantSelector().fit(vectors)
assert(significant.transform(dv) == dv)
assert(significant.transform(sv1) == sv1)
assert(significant.transform(sv2) == sv2)
}


test("shortest result vector") {
val vectors = sc.parallelize(List(
Vectors.dense(0.0, 2.0, 3.0, 4.0),
Vectors.dense(0.0, 2.0, 3.0, 4.0),
Vectors.dense(0.0, 2.0, 3.0, 4.0),
Vectors.sparse(4, Seq((1, 3.0), (2, 4.0))),
Vectors.dense(0.0, 3.0, 5.0, 4.0),
Vectors.dense(0.0, 3.0, 7.0, 4.0)
))

val significant = new SignificantSelector().fit(vectors)

val significanted_dv = Vectors.dense(2.0, 3.0, 4.0)
val significanted_sv1 = Vectors.sparse(3, Seq((0, 2.0), (1, 3.0), (2, 4.0)))
val significanted_sv2 = Vectors.sparse(3, Seq((1, 3.0)))

assert(significant.transform(dv) == significanted_dv)
assert(significant.transform(sv1) == significanted_sv1)
assert(significant.transform(sv2) == significanted_sv2)
}

test("empty result vector") {
val vectors = sc.parallelize(List(
Vectors.dense(0.0, 2.0, 3.0, 4.0),
Vectors.dense(0.0, 2.0, 3.0, 4.0)
))

val significant = new SignificantSelector().fit(vectors)

val empty_vector = Vectors.dense(Array[Double]())

assert(significant.transform(dv) == empty_vector)
assert(significant.transform(sv1) == empty_vector)
assert(significant.transform(sv2) == empty_vector)
}
}