Skip to content

[SPARK-3261] [MLLIB] KMeans clusterer can return duplicate cluster centers #15450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

srowen
Copy link
Member

@srowen srowen commented Oct 12, 2016

What changes were proposed in this pull request?

Return potentially fewer than k cluster centers in cases where k distinct centroids aren't available or aren't selected.

How was this patch tested?

Existing tests

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66816 has finished for PR 15450 at commit 42279b8.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member Author

srowen commented Oct 15, 2016

@sethah I wanted to check how strongly against this kind of change you might be, and continue to discussion here.

@SparkQA
Copy link

SparkQA commented Oct 15, 2016

Test build #67009 has finished for PR 15450 at commit ab486c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sethah
Copy link
Contributor

sethah commented Oct 16, 2016

@srowen I'm not against the change per se, I was just hoping to understand how duplicate centers arise. In the case of initRandom sampling with replacement makes it possible to select the same initial centers, but it should be quite unlikely if there are much more unique data points than requested centers. Even when this happens, the algorithm should move the centers unless they never have any data assigned to them. Since the centers are double-valued points in the feature space, when we say duplicate centers do we mean literally duplicate or that |c1 - c2|_p < eps for some norm?

It seems to me that the problem of duplicate centers would not arise in most real-world use cases, but from comments on the JIRA it appears that assumption could be false. I think it's easier to assess the change if we understand what causes this situation. Do you have any insight?

@srowen
Copy link
Member Author

srowen commented Oct 17, 2016

@sethah I agree that when there are lots of unique points (>> k) then this is almost certain to not happen, and that's most real-world use cases, but the question indeed is what should happen when this is not the case. In that sense, this change only affects corner cases so isn't really a big deal either way.

Yes the one case is clear: sampling with replacement when the data set has < k (unique) points. It will always return k centroids, so must return duplicates. In this case, every point will be at distance 0 from some centroid and so I don't think the centroids can move apart. It stops in 1 iteration with the degenerate solution, with some centroids assigned 0 points. Not the end of the world but not exactly meaningful.

The more interesting case is k-means ||. Of course, again, if there are < k unique points to start, in this case as well, returning k centroids means returning duplicates. Same argument there -- seems to be no value in returning k centroids.

This is really the sum of the argument to me, regardless of what Derrick's case is.

A twist: it's possible, but quite improbable, for k-means || to choose fewer than k unique centroids, when there are >= k distinct points. This is most likely when there are barely more than k distinct points. In that case it's possible that duplicated centroids do get pulled apart and do end up doing something meaningful. I am arguing this case is not worth dealing with because it's rare and it doesn't meaningfully harm the quality of the resulting clustering, but, that point is arguable.

I am about 7/10 in favor of the change, certainly the bit about sampling without replacement, but the rest I could drop if there's any significant objection to it.

@sethah
Copy link
Contributor

sethah commented Oct 17, 2016

The cases you enumerated are the ones I was thinking of. The changes introduced here would alleviate those problems, I agree. What I'm wondering is if this problem still exists in other cases. If Derrick had 1.3M data points and asked for 10k clusters, and got only 1k unique cluster centers, why did it happen? Is it common/possible for clusters that start in different locations to converge to the same point? I'd be interested to replicate this issue, not sure if I will have the time.

Also, I'm slightly inclined to match scikit or R unless we're certain of a clear benefit not to.

@srowen
Copy link
Member Author

srowen commented Oct 18, 2016

@sethah I should say I am not trying to handle cases where clusters start separate and converge to nearly the same point. I don't that's something we should even try to do.

To elaborate, here are the relevant cases, I think. I'm ignoring cases where the data size is >> k because it all behaves as desired with high probability already.

Case 1. Data = [A, B], k = 3, init = random
Now, you'll get 3 centroids and definitely have a duplicate. After this change, you'd get 2.

Case 2. Data = [A, A, B, B], k = 3, init = random
Now, you'll get 3 centroids and definitely have a duplicate. After this change, same thing.

Case 3. Data = [A, B, C, D], k = 3, init = random
Now, you'll get 3 centroids and occasionally have a duplicate. After, this change, you'd get 3 distinct centroids always.

Case 4. Data = [A, B], k = 3, init = parallel
Now, you'll get 3 centroids and definitely have a duplicate. After this change, you should get 2 if I read the code right (that it won't pick a centroid at distance 0 from another).

Case 5. Data = [A, A, B, B], k = 3, init = parallel
Now, you'll get 3 centroids and definitely have a duplicate. After this change, you should get 2 for the same reason above.

Case 6. Data = [A, B, C, D], k = 3, init = parallel
Now, you'll get 3 centroids and occasionally have a duplicate. After this change, same thing.

Case 1/4/5 are, I think, positive changes. Case 3 seems like a clear win. Case 2 could be made consistent with 1/4/5 easily, I suppose, by calling .distinct. Case 6 is the interesting one -- can we make it manage to return 3 distinct centroids always because that's clearly possible? like case 3? I punted on that though.

I think Derrick's case is like 2 or 5. There are duplicates in the data and not many unique points. Filtering the duplicates early is a performance win. At least -- that is the case I am trying to solve. If that's not what is described in the JIRA then I think it's a different issue or question.

I think there's a benefit to speed and meaningfulness of the model in case 1/2/4/5.

@sethah
Copy link
Contributor

sethah commented Oct 18, 2016

Aren't all these cases sort of non-sensical anyway? What good is performing clustering on a dataset where the result has (approximately) the same number of clusters as unique data points?

The reason I'm clarifying this is because if this arguably beneficial patch only applies to useless applications of clustering, then I would lean towards maintaining the current behavior.

@srowen
Copy link
Member Author

srowen commented Oct 18, 2016

Heh, I believe the PC term is 'corner cases'. I agree. There's not much point in clustering data to k centroids when there are <= k distinct points. I think that's all the more reasons to not make the output sillier by returning k centroids. That's really the extent of the question here. I am not sure I understand why you believe the current behavior is better?

I'd like to resolve this at last, so unless you would -1 this, I'm going to proceed.

@sethah
Copy link
Contributor

sethah commented Oct 18, 2016

I don't feel strongly either way, but I don't like the potential of this:

model.getK
scala> 3
model.clusterCenters.length
scala> 1

Should we consider setting the model k to the number of clusters at the end of training? Or is there some utility to knowing that the number of clusters was less than the requested k?

@srowen
Copy link
Member Author

srowen commented Oct 19, 2016

k is a parameter to the model building process, and I don't think it should change based on the model that comes out. It's the requested or maximum number of centroids, if you like. Or, weigh that against printing the centroids and finding duplicates. Does that make more sense?

@SparkQA
Copy link

SparkQA commented Oct 19, 2016

Test build #67188 has finished for PR 15450 at commit 85c9857.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@sethah sethah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few comments. I will take another look in more detail later on.

@@ -75,7 +75,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {

// Make sure code runs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while we're here, can we make var model = ... into val model = ... and delete L74 val center = ... which is not used.

@@ -75,7 +75,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {

// Make sure code runs.
var model = KMeans.train(data, k = 2, maxIterations = 1)
assert(model.clusterCenters.size === 2)
assert(model.clusterCenters.size === 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests don't cover initRandom. We should add both initialization methods to be complete

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add more tests, yes.

// Select without replacement; may still produce duplicates if the data has < k distinct
// points, so deduplicate the centroids to match the behavior of k-means|| in the same situation
data.takeSample(false, k, new XORShiftRandom(this.seed).nextInt()).
map(_.vector).distinct.map(new VectorWithNorm(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we generally put the . on the next line, but I'm not sure it's a strict style requirement.

Also, we end up recomputing the norm of each sample. For large k this can be inefficient. I'm not sure of a non-ugly way to implement the distinct otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move the dot. Agree, though computing k norms once seems negligible in the context of the overall computation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think you're probably right about it being negligible.

@SparkQA
Copy link

SparkQA commented Oct 20, 2016

Test build #67256 has finished for PR 15450 at commit ebebcb9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -378,10 +382,10 @@ class KMeans private (
costs.unpersist(blocking = false)
bcNewCentersList.foreach(_.destroy(false))

if (centers.size == k) {
if (centers.size <= k) {
Copy link
Contributor

@sethah sethah Oct 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing that enforces these centers to be distinct. The following test(s) fail (IMO these are much more thorough than the existing tests):

  test("unique cluster centers") {
    val rng = new scala.util.Random(42)
    val points = (0 until 10).map(i => Vectors.dense(Array.fill(3)(rng.nextDouble)))
    val data = sc.parallelize(
      points.flatMap { point =>
        Array.fill(rng.nextInt(4))(point)
      }, 2
    )
    val norms = data.map(Vectors.norm(_, 2.0))
    val zippedData = data.zip(norms).map { case (v, norm) =>
      new VectorWithNorm(v, norm)
    }
    // less centers than k
    val km = new KMeans().setK(50)
      .setMaxIterations(10)
      .setInitializationMode("k-means||")
      .setInitializationSteps(10)
      .setSeed(42)
    val initialCenters = km.initKMeansParallel(zippedData).map(_.vector)
    assert(initialCenters.length === initialCenters.distinct.length)

    val model = km.run(data)
    val finalCenters = model.clusterCenters
    assert(finalCenters.length === finalCenters.distinct.length)

    // run local k-means
    val km2 = new KMeans().setK(10)
      .setMaxIterations(10)
      .setInitializationMode("k-means||")
      .setInitializationSteps(10)
      .setSeed(42)
    val initialCenters2 = km2.initKMeansParallel(zippedData).map(_.vector)
    assert(initialCenters2.length === initialCenters2.distinct.length)

    val model2 = km.run(data)
    val finalCenters2 = model2.clusterCenters
    assert(finalCenters2.length === finalCenters2.distinct.length)
}

(BTW I had to modify initKMeansParallel to be non-private for that test)

I guess we can call distinct on the centers before this if statement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, although it's not a goal to guarantee distinct centers, though that's a nice to have where possible. I think my goal is improving the straightforward cases, and, maintaining as much consistency as possible. I agree with adding a test like this and adding a call to .distinct. Might as well take this to a logical conclusion.

It also raises the interesting question: if you have >= k distinct points, and happen to pick < k distinct centroids, should you go back and replenish the set of centroids? I am punting on that right now but it's a legitimate point. It's quite a corner case though.

@sethah
Copy link
Contributor

sethah commented Oct 21, 2016

Also, if we're going to make this change, we should document in the ML estimator that the algorithm can return fewer than k centers.

…ladoc about k everywhere; add sethah's new test
@SparkQA
Copy link

SparkQA commented Oct 21, 2016

Test build #67335 has finished for PR 15450 at commit 793e4d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member Author

srowen commented Oct 24, 2016

@sethah let me know how you feel about it at this stage

Copy link
Contributor

@sethah sethah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, just some cleanup changes to make to the tests really.

@@ -48,7 +48,11 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe
final val k = new IntParam(this, "k", "The number of clusters to create. " +
"Must be > 1.", ParamValidators.gt(1))

/** @group getParam */
/**
* Number of clusters to create (k). Note that it is possible for fewer than k clusters to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the typical convention for this type of documentation is to attach it to the definition of the param, instead of in both the getter and setter. That way it only appears once.

}

test("unique cluster centers") {
val rng = new scala.util.Random(42)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make seed a val and use it instead of literal 42.

}
// less centers than k
val km = new KMeans().setK(50)
.setMaxIterations(10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use 5 or less for max iterations.

}, 2
)
val norms = data.map(Vectors.norm(_, 2.0))
val zippedData = data.zip(norms).map { case (v, norm) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make this val normedData = data.map(new VectorWithNorm(_)). The zipping was done for performance (not necessary here) and I just copied the code originally.

}


test("fewer clusters than points") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to remove these two tests since we've added a more thorough test below. We can check the "random" init method in that test as well, then we can eliminate these two.

.setInitializationSteps(10)
.setSeed(42)
val initialCenters = km.initKMeansParallel(zippedData).map(_.vector)
assert(initialCenters.length === initialCenters.distinct.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check initialCenters.length <= numDistinctPoints where numDistinctPoints is 10 (defined above) at the moment.

.setInitializationSteps(10)
.setSeed(42)
val initialCenters2 = km2.initKMeansParallel(zippedData).map(_.vector)
assert(initialCenters2.length === initialCenters2.distinct.length)
Copy link
Contributor

@sethah sethah Oct 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check initialCenters2.length === k

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition failed, though it should be OK. The problem was that the data setup maps each of 10 distinct points to 0-3 copies, meaning that there may be less than 10 distinct points in the end. I just make that 1-3 copies and it works.

Fixed the doc problem too, thanks.

val initialCenters2 = km2.initKMeansParallel(zippedData).map(_.vector)
assert(initialCenters2.length === initialCenters2.distinct.length)

val model2 = km.run(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be val model2 = km2.run(data)

@SparkQA
Copy link

SparkQA commented Oct 25, 2016

Test build #67512 has finished for PR 15450 at commit d1004d9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@sethah sethah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending added test case. Thanks a lot!

.setSeed(seed)
val initialCenters2 = km2.initKMeansParallel(normedData).map(_.vector)
assert(initialCenters2.length === initialCenters2.distinct.length)
assert(initialCenters2.length === 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor/nit: maybe make k a val here and use that instead. Since we use 10 for something else above, this can be obfuscated in the future.

// Make sure code runs.
var model = KMeans.train(data, k = 3, maxIterations = 1)
assert(model.clusterCenters.size === 3)
test("unique cluster centers") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation

val data = sc.parallelize(points.flatMap(Array.fill(1 + rng.nextInt(3))(_)), 2)
val normedData = data.map(new VectorWithNorm(_))

// less centers than k
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test the "random" method here? I was going to suggest putting the test cases inside Seq("k-means||", "random").foreach { initMode =>, but we run the specific parallel case. Maybe just manually add it?

@srowen
Copy link
Member Author

srowen commented Oct 25, 2016

@sethah done. I also removed references to the runs parameter, which has no effect (and was triggering deprecation warnings). I should have done that last time.

Copy link
Contributor

@sethah sethah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last minor thing :)

@@ -257,11 +246,6 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
model = KMeans.train(rdd, k = 5, maxIterations = 10)
assert(model.clusterCenters.sortBy(VectorWithCompare(_))
.zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5))

// Neither should more runs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix the other comments that reference runs. e.g. "No matter how many iterations or runs we use ...."

@SparkQA
Copy link

SparkQA commented Oct 25, 2016

Test build #67520 has finished for PR 15450 at commit 79c84ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member Author

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Let me see one more test pass and then I'll merge. TYVM

@SparkQA
Copy link

SparkQA commented Oct 29, 2016

Test build #67756 has finished for PR 15450 at commit f870fe9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member Author

srowen commented Oct 30, 2016

merged to master

@asfgit asfgit closed this in a489567 Oct 30, 2016
@srowen srowen deleted the SPARK-3261 branch October 31, 2016 10:11
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
…ters

## What changes were proposed in this pull request?

Return potentially fewer than k cluster centers in cases where k distinct centroids aren't available or aren't selected.

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes apache#15450 from srowen/SPARK-3261.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…ters

## What changes were proposed in this pull request?

Return potentially fewer than k cluster centers in cases where k distinct centroids aren't available or aren't selected.

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes apache#15450 from srowen/SPARK-3261.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants