Skip to content

SPARK-1310: Start adding k-fold cross validation to MLLib [adds kFold to MLUtils & fixes bug in BernoulliSampler] #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false)
}
}

override def clone = new BernoulliSampler[T](lb, ub)
/**
* Return a sampler with is the complement of the range specified of the current sampler.
*/
def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement)

override def clone = new BernoulliSampler[T](lb, ub, complement)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add return type.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,31 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
random.nextDouble().andReturn(x)
}
}
whenExecuting(random)
{
whenExecuting(random) {
val sampler = new BernoulliSampler[Int](0.25, 0.55)(random)
assert(sampler.sample(a.iterator).toList == List(3, 4, 5))
}
}

test("BernoulliSamplerWithRangeInverse") {
expecting {
for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) {
random.nextDouble().andReturn(x)
}
}
whenExecuting(random) {
val sampler = new BernoulliSampler[Int](0.25, 0.55, true)(random)
assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9))
}
}

test("BernoulliSamplerWithRatio") {
expecting {
for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) {
random.nextDouble().andReturn(x)
}
}
whenExecuting(random)
{
whenExecuting(random) {
val sampler = new BernoulliSampler[Int](0.35)(random)
assert(sampler.sample(a.iterator).toList == List(1, 2, 3))
}
Expand All @@ -67,8 +77,7 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
random.nextDouble().andReturn(x)
}
}
whenExecuting(random)
{
whenExecuting(random) {
val sampler = new BernoulliSampler[Int](0.25, 0.55, true)(random)
assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9))
}
Expand All @@ -78,8 +87,7 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
expecting {
random.setSeed(10L)
}
whenExecuting(random)
{
whenExecuting(random) {
val sampler = new BernoulliSampler[Int](0.2)(random)
sampler.setSeed(10L)
}
Expand Down
21 changes: 21 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

package org.apache.spark.mllib.util

import scala.reflect.ClassTag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

organize imports


import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance}

import org.apache.spark.annotation.Experimental
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
import org.apache.spark.SparkContext._
import org.apache.spark.util.random.BernoulliSampler
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

Expand Down Expand Up @@ -157,6 +162,22 @@ object MLUtils {
dataStr.saveAsTextFile(dir)
}

/**
* Return a k element array of pairs of RDDs with the first element of each pair
* containing the training data, a complement of the validation data and the second
* element, the validation data, containing a unique 1/kth of the data. Where k=numFolds.
*/
def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is natural to have training before validation in the return array because you always need to use the training set first.

val numFoldsF = numFolds.toFloat
(1 to numFolds).map { fold =>
val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF,
complement = false)
val validation = new PartitionwiseSampledRDD(rdd, sampler, seed)
val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), seed)
(training, validation)
}.toArray
}

/**
* Returns the squared Euclidean distance between two vectors. The following formula will be used
* if it does not introduce too much numerical error:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.mllib.util

import java.io.File

import scala.math
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insert an empty line between java imports and scala imports

import scala.util.Random

import org.scalatest.FunSuite

import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNorm,
Expand Down Expand Up @@ -93,4 +96,40 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
case t: Throwable =>
}
}

test("kFold") {
val data = sc.parallelize(1 to 100, 2)
val collectedData = data.collect().sorted
val twoFoldedRdd = MLUtils.kFold(data, 2, 1)
assert(twoFoldedRdd(0)._1.collect().sorted === twoFoldedRdd(1)._2.collect().sorted)
assert(twoFoldedRdd(0)._2.collect().sorted === twoFoldedRdd(1)._1.collect().sorted)
for (folds <- 2 to 10) {
for (seed <- 1 to 5) {
val foldedRdds = MLUtils.kFold(data, folds, seed)
assert(foldedRdds.size === folds)
foldedRdds.map { case (training, validation) =>
val result = validation.union(training).collect().sorted
val validationSize = validation.collect().size.toFloat
assert(validationSize > 0, "empty validation data")
val p = 1 / folds.toFloat
// Within 3 standard deviations of the mean
val range = 3 * math.sqrt(100 * p * (1 - p))
val expected = 100 * p
val lowerBound = expected - range
val upperBound = expected + range
assert(validationSize > lowerBound,
s"Validation data ($validationSize) smaller than expected ($lowerBound)" )
assert(validationSize < upperBound,
s"Validation data ($validationSize) larger than expected ($upperBound)" )
assert(training.collect().size > 0, "empty training data")
assert(result === collectedData,
"Each training+validation set combined should contain all of the data.")
}
// K fold cross validation should only have each element in the validation set exactly once
assert(foldedRdds.map(_._2).reduce((x,y) => x.union(y)).collect().sorted ===
data.collect().sorted)
}
}
}

}