-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-16473][MLLIB] Fix BisectingKMeans Algorithm failing in edge case #16355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…se where no children exist in updateAssignments
Jenkins, test this please. |
@imatiach-msft Can you add a test case? |
@imatiach-msft , thanks for creating pull request and committing change which I have shared , I will try to share some sample dataset for this issue. |
@@ -339,10 +339,14 @@ private object BisectingKMeans extends Serializable { | |||
assignments.map { case (index, v) => | |||
if (divisibleIndices.contains(index)) { | |||
val children = Seq(leftChildIndex(index), rightChildIndex(index)) | |||
val selected = children.minBy { child => | |||
KMeans.fastSquaredDistance(newClusterCenters(child), v) | |||
if (children.length > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you could write children.nonEmpty
, but, when would it not have length 2? it's initialized in the line above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see following possibility to get java.util.NoSuchElementException in the original code.
leftChildIndex(index) , this methods returns index2 and rightChildIndex(index) returns index2+1. If the Map object newClusterCenters does not have entries belonging to leftChildIndex(index) or rightChildIndex(index) .
Here we need to have this check newClusterCenters.contains(child) , to avoid this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a filter on whether new cluster centers contains the children. Would you be able to send me a sample dataset so I can validate the fix? Thank you!
Good point. It looks like we should be checking if the map contains the child or not. However, I'm not sure if that is the correct solution either. I need a repro dataset from the bug reporter. |
That makes more sense as a fix, yes. Sounds like there is still a to-do to verify the fix. If it's possible to write a simple unit test to cover it, all the better. |
Yep, there is still a TODO to verify the fix. I'm waiting for the dataset from Alok to reproduce the issue: |
You can get sample vectors at this location https://github.com/alokob/SparkClusteringDataSet/SampleVectors.txt. Also while executing bisecting K-Means , we have set following configuration settings , BisectingKMeans.setK(100); Please let me know if any additional details are needed. |
@imatiach-msft Did you find the dataset suitable. Is anything else needed from my side? |
Hi Alok! |
Thats ok , enjoy Xmas. |
I have very good news :). I was not only able to repro the issue with your dataset, but I was also able to verify that with the suggested fix the algorithm does not fail (adding the val newClusterChildren = children.filter(newClusterCenters.contains(_)) fixed the issue). I need to figure out how to actually add the test case to spark though - my understanding is that checking in dataset files is not allowed? My test code (not cleaned up yet) was: import org.apache.spark.mllib.linalg.{ import org.apache.spark.ml.linalg.{ @transient var loadedDataset: Dataset[_] = _ override def beforeAll(): Unit = {
} |
ok to test |
Nice to know that , codefix I suggested is working. Its really nice to contribute in spark. |
Test build #70679 has finished for PR 16355 at commit
|
…c sparse data which can generate the exception user encountered.
I've updated with a new commit. I was able to reproduce the issue by generating a synthetic sparse dataset similar to the one Alok sent me, in accordance with the test-style of spark test methods. Hence, I was able to verify the fix. Please review the new code changes and let me know who else needs to review in order for the changes to be committed. |
Jenkins, retest this please |
Test build #70682 has finished for PR 16355 at commit
|
Jenkins, retest this please |
Test build #70688 has finished for PR 16355 at commit
|
@jkbradley @srowen any comments on the changes? Thank you! |
@yu-iskw Pinging on this since you wrote bisecting k-means originally. Do you have time to take a look? Thanks! |
the only problem I see is that with this code we generate k-1 clusters instead of k, but it states in the algorithm documentation that it is not guaranteed to generate k clusters, it could be fewer if the leaf clusters are not divisible (see spark/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala): Iteratively it finds divisible clusters on the bottom level and bisects each of them using It seems in the dataset Alok gave, one of the clusters which was assumed to be divisible and was divided ended up generating two clusters, one which contained all the points and the other none, which is what created the error (his cluster 162, child of 81, was empty, but cluster 163 was non-empty after reassignment). My intuition is that it should be possible for the algorithm to account for failed splits and then try to split in a different leaf node, but that change seems non-trivial to me. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates! I agree we shouldn't try to retry failed splits in this PR.
override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k) | ||
sparseDataset = KMeansSuite.generateSparseData(spark, 100, 1000, k, 42) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the test really need to be this large? It takes ~1 sec which is long-ish for a unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can only repro the issue with very sparse data where number of columns is around 1k. I was able to reduce the number of rows to be only 10 however. I hope that is a small enough dataset.
val bkm = new BisectingKMeans().setK(k).setMinDivisibleClusterSize(4).setMaxIter(4) | ||
|
||
assert(bkm.getK === k) | ||
assert(bkm.getFeaturesCol === "features") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to test featuresCol, predictionCol, and other things which aren't relevant to this unit test. I'd simplify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, removed
assert(bkm.getPredictionCol === "prediction") | ||
assert(bkm.getMaxIter === 4) | ||
assert(bkm.getMinDivisibleClusterSize === 4) | ||
// Verify fit does not fail on very sparse data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me that this unit test actually tests the issue fixed in this PR. Is there a good way to see why it would? If not, then it would be great to write a tiny dataset by hand which would trigger the failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this check to verify:
// Verify we hit the edge case
assert(numClusters < k && numClusters > 1)
the issue only occurs for very sparse data, but it occurs very consistently (almost all very sparse data that I generate can trigger the error)
@@ -160,6 +162,17 @@ object KMeansSuite { | |||
spark.createDataFrame(rdd) | |||
} | |||
|
|||
def generateSparseData(spark: SparkSession, rows: Int, dim: Int, k: Int, seed: Int): DataFrame = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
k is never used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modified the method to use it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just 2 small comments left. Thanks!
"one cluster is empty after split") { | ||
val bkm = new BisectingKMeans().setK(k).setMinDivisibleClusterSize(4).setMaxIter(4) | ||
|
||
assert(bkm.getK === k) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't bother testing getK, getMaxIter, or getMinDivisibleClusterSize
The setters should be tested elsewhere, so you may assume they work here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, removed
val nnz = random.nextInt(dim) | ||
val rdd = sc.parallelize(1 to rows) | ||
.map(i => Vectors.sparse(dim, random.shuffle(0 to dim - 1).slice(0, nnz).sorted.toArray, | ||
Array.fill(nnz)(random.nextInt(k).toDouble))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this use of k. The feature value can be any random number. I'd remove k.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, removed k
@jkbradley thanks, I've updated the code based on your latest comments - I removed k and the verification for the setters. |
Test build #71289 has finished for PR 16355 at commit
|
ping @jkbradley would you be able to take another look at the bisecting K-Means fix? |
test this please |
@@ -51,6 +54,18 @@ class BisectingKMeansSuite | |||
assert(copiedModel.hasSummary) | |||
} | |||
|
|||
test("SPARK-16473: Verify Bisecting K-Means does not fail in edge case where" + | |||
"one cluster is empty after split") { | |||
val bkm = new BisectingKMeans().setK(k).setMinDivisibleClusterSize(4).setMaxIter(4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please set the seed too since this test seems at risk of flakiness
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, added seed
I was about to say this is ready, but I do think we should add the seed. Other than that, this should be ready! |
@jkbradley done, added seed. Thanks! |
Test build #71533 has finished for PR 16355 at commit
|
Test build #71538 has finished for PR 16355 at commit
|
Test build #3538 has finished for PR 16355 at commit
|
ping @jkbradley would you be able to take another look at the bisecting kmeans model? I've updated with the random seed as requested. |
ping @jkbradley would you be able to take another look at the bisecting kmeans model? I've updated with the random seed as requested, and the build succeeded. Thank you! |
ping @jkbradley would you be able to take another look at the bisecting kmeans model? Thanks! |
ping @jkbradley would you be able to take another look at the bisecting kmeans model? Thanks! |
LGTM |
Test build #3548 has finished for PR 16355 at commit
|
Merging with master. Will try to backport to branch-2.1 as well. |
I was able to check out this commit and test it with branch-2.1, but now I can't get the merge script to merge it for branch-2.1. @srowen would you mind trying? Thanks! |
I just had some issues with that too. But manually merging (git cherry-pick + git push) seems to still work, so maybe try that. |
It's an apache-github sync issue: https://github.com/apache/spark/commits/branch-2.1 I'll cherry-pick onto apache/branch-2.1 and push as that might also kick the sync to try again. |
[SPARK-16473][MLLIB] Fix BisectingKMeans Algorithm failing in edge case where no children exist in updateAssignments ## What changes were proposed in this pull request? Fix a bug in which BisectingKMeans fails with error: java.util.NoSuchElementException: key not found: 166 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply$mcDJ$sp(BisectingKMeans.scala:338) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) at scala.collection.TraversableOnce$$anonfun$minBy$1.apply(TraversableOnce.scala:231) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125) at scala.collection.immutable.List.reduceLeft(List.scala:84) at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231) at scala.collection.AbstractTraversable.minBy(Traversable.scala:105) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:337) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:334) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) ## How was this patch tested? The dataset was run against the code change to verify that the code works. I will try to add unit tests to the code. (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Ilya Matiach <ilmat@microsoft.com> Closes #16355 from imatiach-msft/ilmat/fix-kmeans.
Done, and it synced now. Merged to master/2.1 |
Oh OK! Thanks @srowen |
[SPARK-16473][MLLIB] Fix BisectingKMeans Algorithm failing in edge case where no children exist in updateAssignments ## What changes were proposed in this pull request? Fix a bug in which BisectingKMeans fails with error: java.util.NoSuchElementException: key not found: 166 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply$mcDJ$sp(BisectingKMeans.scala:338) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) at scala.collection.TraversableOnce$$anonfun$minBy$1.apply(TraversableOnce.scala:231) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125) at scala.collection.immutable.List.reduceLeft(List.scala:84) at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231) at scala.collection.AbstractTraversable.minBy(Traversable.scala:105) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:337) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:334) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) ## How was this patch tested? The dataset was run against the code change to verify that the code works. I will try to add unit tests to the code. (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Ilya Matiach <ilmat@microsoft.com> Closes apache#16355 from imatiach-msft/ilmat/fix-kmeans.
[SPARK-16473][MLLIB] Fix BisectingKMeans Algorithm failing in edge case where no children exist in updateAssignments ## What changes were proposed in this pull request? Fix a bug in which BisectingKMeans fails with error: java.util.NoSuchElementException: key not found: 166 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply$mcDJ$sp(BisectingKMeans.scala:338) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) at scala.collection.TraversableOnce$$anonfun$minBy$1.apply(TraversableOnce.scala:231) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125) at scala.collection.immutable.List.reduceLeft(List.scala:84) at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231) at scala.collection.AbstractTraversable.minBy(Traversable.scala:105) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:337) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:334) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) ## How was this patch tested? The dataset was run against the code change to verify that the code works. I will try to add unit tests to the code. (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Ilya Matiach <ilmat@microsoft.com> Closes apache#16355 from imatiach-msft/ilmat/fix-kmeans.
[SPARK-16473][MLLIB] Fix BisectingKMeans Algorithm failing in edge case where no children exist in updateAssignments
What changes were proposed in this pull request?
Fix a bug in which BisectingKMeans fails with error:
java.util.NoSuchElementException: key not found: 166
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply$mcDJ$sp(BisectingKMeans.scala:338)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337)
at scala.collection.TraversableOnce$$anonfun$minBy$1.apply(TraversableOnce.scala:231)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125)
at scala.collection.immutable.List.reduceLeft(List.scala:84)
at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231)
at scala.collection.AbstractTraversable.minBy(Traversable.scala:105)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:337)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:334)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
How was this patch tested?
The dataset was run against the code change to verify that the code works. I will try to add unit tests to the code.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.