-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-3261] [MLLIB] KMeans clusterer can return duplicate cluster centers #15450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #66816 has finished for PR 15450 at commit
|
@sethah I wanted to check how strongly against this kind of change you might be, and continue to discussion here. |
Test build #67009 has finished for PR 15450 at commit
|
@srowen I'm not against the change per se, I was just hoping to understand how duplicate centers arise. In the case of It seems to me that the problem of duplicate centers would not arise in most real-world use cases, but from comments on the JIRA it appears that assumption could be false. I think it's easier to assess the change if we understand what causes this situation. Do you have any insight? |
@sethah I agree that when there are lots of unique points (>> k) then this is almost certain to not happen, and that's most real-world use cases, but the question indeed is what should happen when this is not the case. In that sense, this change only affects corner cases so isn't really a big deal either way. Yes the one case is clear: sampling with replacement when the data set has < k (unique) points. It will always return k centroids, so must return duplicates. In this case, every point will be at distance 0 from some centroid and so I don't think the centroids can move apart. It stops in 1 iteration with the degenerate solution, with some centroids assigned 0 points. Not the end of the world but not exactly meaningful. The more interesting case is k-means ||. Of course, again, if there are < k unique points to start, in this case as well, returning k centroids means returning duplicates. Same argument there -- seems to be no value in returning k centroids. This is really the sum of the argument to me, regardless of what Derrick's case is. A twist: it's possible, but quite improbable, for k-means || to choose fewer than k unique centroids, when there are >= k distinct points. This is most likely when there are barely more than k distinct points. In that case it's possible that duplicated centroids do get pulled apart and do end up doing something meaningful. I am arguing this case is not worth dealing with because it's rare and it doesn't meaningfully harm the quality of the resulting clustering, but, that point is arguable. I am about 7/10 in favor of the change, certainly the bit about sampling without replacement, but the rest I could drop if there's any significant objection to it. |
The cases you enumerated are the ones I was thinking of. The changes introduced here would alleviate those problems, I agree. What I'm wondering is if this problem still exists in other cases. If Derrick had 1.3M data points and asked for 10k clusters, and got only 1k unique cluster centers, why did it happen? Is it common/possible for clusters that start in different locations to converge to the same point? I'd be interested to replicate this issue, not sure if I will have the time. Also, I'm slightly inclined to match scikit or R unless we're certain of a clear benefit not to. |
@sethah I should say I am not trying to handle cases where clusters start separate and converge to nearly the same point. I don't that's something we should even try to do. To elaborate, here are the relevant cases, I think. I'm ignoring cases where the data size is >> k because it all behaves as desired with high probability already. Case 1. Data = [A, B], k = 3, init = random Case 2. Data = [A, A, B, B], k = 3, init = random Case 3. Data = [A, B, C, D], k = 3, init = random Case 4. Data = [A, B], k = 3, init = parallel Case 5. Data = [A, A, B, B], k = 3, init = parallel Case 6. Data = [A, B, C, D], k = 3, init = parallel Case 1/4/5 are, I think, positive changes. Case 3 seems like a clear win. Case 2 could be made consistent with 1/4/5 easily, I suppose, by calling I think Derrick's case is like 2 or 5. There are duplicates in the data and not many unique points. Filtering the duplicates early is a performance win. At least -- that is the case I am trying to solve. If that's not what is described in the JIRA then I think it's a different issue or question. I think there's a benefit to speed and meaningfulness of the model in case 1/2/4/5. |
Aren't all these cases sort of non-sensical anyway? What good is performing clustering on a dataset where the result has (approximately) the same number of clusters as unique data points? The reason I'm clarifying this is because if this arguably beneficial patch only applies to useless applications of clustering, then I would lean towards maintaining the current behavior. |
Heh, I believe the PC term is 'corner cases'. I agree. There's not much point in clustering data to k centroids when there are <= k distinct points. I think that's all the more reasons to not make the output sillier by returning k centroids. That's really the extent of the question here. I am not sure I understand why you believe the current behavior is better? I'd like to resolve this at last, so unless you would -1 this, I'm going to proceed. |
I don't feel strongly either way, but I don't like the potential of this: model.getK
scala> 3
model.clusterCenters.length
scala> 1 Should we consider setting the model |
k is a parameter to the model building process, and I don't think it should change based on the model that comes out. It's the requested or maximum number of centroids, if you like. Or, weigh that against printing the centroids and finding duplicates. Does that make more sense? |
…inct centroids aren't available or aren't selected
…ehavior of parallel init
Test build #67188 has finished for PR 15450 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments. I will take another look in more detail later on.
@@ -75,7 +75,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { | |||
|
|||
// Make sure code runs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while we're here, can we make var model = ...
into val model = ...
and delete L74 val center = ...
which is not used.
@@ -75,7 +75,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { | |||
|
|||
// Make sure code runs. | |||
var model = KMeans.train(data, k = 2, maxIterations = 1) | |||
assert(model.clusterCenters.size === 2) | |||
assert(model.clusterCenters.size === 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these tests don't cover initRandom
. We should add both initialization methods to be complete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add more tests, yes.
// Select without replacement; may still produce duplicates if the data has < k distinct | ||
// points, so deduplicate the centroids to match the behavior of k-means|| in the same situation | ||
data.takeSample(false, k, new XORShiftRandom(this.seed).nextInt()). | ||
map(_.vector).distinct.map(new VectorWithNorm(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we generally put the .
on the next line, but I'm not sure it's a strict style requirement.
Also, we end up recomputing the norm of each sample. For large k
this can be inefficient. I'm not sure of a non-ugly way to implement the distinct otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll move the dot. Agree, though computing k norms once seems negligible in the context of the overall computation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think you're probably right about it being negligible.
Test build #67256 has finished for PR 15450 at commit
|
@@ -378,10 +382,10 @@ class KMeans private ( | |||
costs.unpersist(blocking = false) | |||
bcNewCentersList.foreach(_.destroy(false)) | |||
|
|||
if (centers.size == k) { | |||
if (centers.size <= k) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's nothing that enforces these centers to be distinct. The following test(s) fail (IMO these are much more thorough than the existing tests):
test("unique cluster centers") {
val rng = new scala.util.Random(42)
val points = (0 until 10).map(i => Vectors.dense(Array.fill(3)(rng.nextDouble)))
val data = sc.parallelize(
points.flatMap { point =>
Array.fill(rng.nextInt(4))(point)
}, 2
)
val norms = data.map(Vectors.norm(_, 2.0))
val zippedData = data.zip(norms).map { case (v, norm) =>
new VectorWithNorm(v, norm)
}
// less centers than k
val km = new KMeans().setK(50)
.setMaxIterations(10)
.setInitializationMode("k-means||")
.setInitializationSteps(10)
.setSeed(42)
val initialCenters = km.initKMeansParallel(zippedData).map(_.vector)
assert(initialCenters.length === initialCenters.distinct.length)
val model = km.run(data)
val finalCenters = model.clusterCenters
assert(finalCenters.length === finalCenters.distinct.length)
// run local k-means
val km2 = new KMeans().setK(10)
.setMaxIterations(10)
.setInitializationMode("k-means||")
.setInitializationSteps(10)
.setSeed(42)
val initialCenters2 = km2.initKMeansParallel(zippedData).map(_.vector)
assert(initialCenters2.length === initialCenters2.distinct.length)
val model2 = km.run(data)
val finalCenters2 = model2.clusterCenters
assert(finalCenters2.length === finalCenters2.distinct.length)
}
(BTW I had to modify initKMeansParallel
to be non-private for that test)
I guess we can call distinct on the centers before this if statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, although it's not a goal to guarantee distinct centers, though that's a nice to have where possible. I think my goal is improving the straightforward cases, and, maintaining as much consistency as possible. I agree with adding a test like this and adding a call to .distinct
. Might as well take this to a logical conclusion.
It also raises the interesting question: if you have >= k distinct points, and happen to pick < k distinct centroids, should you go back and replenish the set of centroids? I am punting on that right now but it's a legitimate point. It's quite a corner case though.
Also, if we're going to make this change, we should document in the ML estimator that the algorithm can return fewer than |
…ladoc about k everywhere; add sethah's new test
Test build #67335 has finished for PR 15450 at commit
|
@sethah let me know how you feel about it at this stage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, just some cleanup changes to make to the tests really.
@@ -48,7 +48,11 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe | |||
final val k = new IntParam(this, "k", "The number of clusters to create. " + | |||
"Must be > 1.", ParamValidators.gt(1)) | |||
|
|||
/** @group getParam */ | |||
/** | |||
* Number of clusters to create (k). Note that it is possible for fewer than k clusters to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the typical convention for this type of documentation is to attach it to the definition of the param, instead of in both the getter and setter. That way it only appears once.
} | ||
|
||
test("unique cluster centers") { | ||
val rng = new scala.util.Random(42) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make seed
a val and use it instead of literal 42.
} | ||
// less centers than k | ||
val km = new KMeans().setK(50) | ||
.setMaxIterations(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use 5 or less for max iterations.
}, 2 | ||
) | ||
val norms = data.map(Vectors.norm(_, 2.0)) | ||
val zippedData = data.zip(norms).map { case (v, norm) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make this val normedData = data.map(new VectorWithNorm(_))
. The zipping was done for performance (not necessary here) and I just copied the code originally.
} | ||
|
||
|
||
test("fewer clusters than points") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to remove these two tests since we've added a more thorough test below. We can check the "random" init method in that test as well, then we can eliminate these two.
.setInitializationSteps(10) | ||
.setSeed(42) | ||
val initialCenters = km.initKMeansParallel(zippedData).map(_.vector) | ||
assert(initialCenters.length === initialCenters.distinct.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also check initialCenters.length <= numDistinctPoints
where numDistinctPoints
is 10 (defined above) at the moment.
.setInitializationSteps(10) | ||
.setSeed(42) | ||
val initialCenters2 = km2.initKMeansParallel(zippedData).map(_.vector) | ||
assert(initialCenters2.length === initialCenters2.distinct.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also check initialCenters2.length === k
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition failed, though it should be OK. The problem was that the data setup maps each of 10 distinct points to 0-3 copies, meaning that there may be less than 10 distinct points in the end. I just make that 1-3 copies and it works.
Fixed the doc problem too, thanks.
val initialCenters2 = km2.initKMeansParallel(zippedData).map(_.vector) | ||
assert(initialCenters2.length === initialCenters2.distinct.length) | ||
|
||
val model2 = km.run(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be val model2 = km2.run(data)
Test build #67512 has finished for PR 15450 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending added test case. Thanks a lot!
.setSeed(seed) | ||
val initialCenters2 = km2.initKMeansParallel(normedData).map(_.vector) | ||
assert(initialCenters2.length === initialCenters2.distinct.length) | ||
assert(initialCenters2.length === 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor/nit: maybe make k
a val here and use that instead. Since we use 10 for something else above, this can be obfuscated in the future.
// Make sure code runs. | ||
var model = KMeans.train(data, k = 3, maxIterations = 1) | ||
assert(model.clusterCenters.size === 3) | ||
test("unique cluster centers") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation
val data = sc.parallelize(points.flatMap(Array.fill(1 + rng.nextInt(3))(_)), 2) | ||
val normedData = data.map(new VectorWithNorm(_)) | ||
|
||
// less centers than k |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also test the "random" method here? I was going to suggest putting the test cases inside Seq("k-means||", "random").foreach { initMode =>
, but we run the specific parallel case. Maybe just manually add it?
@sethah done. I also removed references to the runs parameter, which has no effect (and was triggering deprecation warnings). I should have done that last time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last minor thing :)
@@ -257,11 +246,6 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { | |||
model = KMeans.train(rdd, k = 5, maxIterations = 10) | |||
assert(model.clusterCenters.sortBy(VectorWithCompare(_)) | |||
.zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5)) | |||
|
|||
// Neither should more runs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's fix the other comments that reference runs. e.g. "No matter how many iterations or runs we use ...."
Test build #67520 has finished for PR 15450 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Let me see one more test pass and then I'll merge. TYVM
Test build #67756 has finished for PR 15450 at commit
|
merged to master |
…ters ## What changes were proposed in this pull request? Return potentially fewer than k cluster centers in cases where k distinct centroids aren't available or aren't selected. ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes apache#15450 from srowen/SPARK-3261.
…ters ## What changes were proposed in this pull request? Return potentially fewer than k cluster centers in cases where k distinct centroids aren't available or aren't selected. ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes apache#15450 from srowen/SPARK-3261.
What changes were proposed in this pull request?
Return potentially fewer than k cluster centers in cases where k distinct centroids aren't available or aren't selected.
How was this patch tested?
Existing tests