Skip to content

[SPARK-1655][MLLIB] WIP Add option for distributed naive bayes model. #2491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ class PythonMLLibAPI extends Serializable {
def trainNaiveBayes(
data: JavaRDD[LabeledPoint],
lambda: Double): java.util.List[java.lang.Object] = {
val model = NaiveBayes.train(data.rdd, lambda)
// val model = NaiveBayes.train(data.rdd, lambda, "local")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disabled the python interface in this PR for now. Let’s figure out the scala implementation first.

val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(Vectors.dense(model.labels))
ret.add(Vectors.dense(model.pi))
ret.add(model.theta)
// ret.add(Vectors.dense(model.labels))
// ret.add(Vectors.dense(model.pi))
// ret.add(model.theta)
ret
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,44 @@

package org.apache.spark.mllib.classification

import scala.reflect.ClassTag

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}

import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
* Model for Naive Bayes Classifiers.
* Abstract model for a naive bayes classifier.
*/
abstract class NaiveBayesModel extends ClassificationModel with Serializable {
/**
* Predict values for the given data set using the trained model.
*
* @param testData PairRDD with values representing data points to be predicted
* @return an RDD[(K, Double)] where each entry contains the corresponding prediction,
* partitioned consistently with testData.
*/
def predictValues[K: ClassTag](testData: RDD[(K, Vector)]): RDD[(K, Double)]
}

/**
* Local model for a naive bayes classifier.
*
* @param labels list of labels
* @param pi log of class priors, whose dimension is C, number of labels
* @param theta log of class conditional probabilities, whose dimension is C-by-D,
* where D is number of features
*/
class NaiveBayesModel private[mllib] (
private class LocalNaiveBayesModel(
val labels: Array[Double],
val pi: Array[Double],
val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {
val theta: Array[Array[Double]]) extends NaiveBayesModel {

private val brzPi = new BDV[Double](pi)
private val brzTheta = new BDM[Double](theta.length, theta(0).length)
Expand All @@ -54,17 +72,60 @@ class NaiveBayesModel private[mllib] (
}
}

override def predict(testData: RDD[Vector]): RDD[Double] = {
def predict(testData: RDD[Vector]): RDD[Double] = {
val bcModel = testData.context.broadcast(this)
testData.mapPartitions { iter =>
val model = bcModel.value
iter.map(model.predict)
}
}

override def predict(testData: Vector): Double = {
def predict(testData: Vector): Double = {
labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
}

def predictValues[K: ClassTag](testData: RDD[(K, Vector)]): RDD[(K, Double)] = {
val bcModel = testData.context.broadcast(this)
testData.mapValues { test =>
bcModel.value.predict(test)
}
}
}

/**
* Distributed model for a naive bayes classifier.
*
* @param model RDD of (label, pi, theta) rows comprising the model.
*/
private class DistNaiveBayesModel(val model: RDD[(Double, Double, BDV[Double])])
extends NaiveBayesModel {

def predict(testData: RDD[Vector]): RDD[Double] = {
val indexed = testData.zipWithIndex().map(_.swap)
// Predict, reorder the results to match the input order, then project the labels.
predictValues(indexed).sortByKey().map(_._2)
}

def predict(testData: Vector): Double = {
val testBreeze = testData.toBreeze
model.map { case (label, pi, theta) =>
(pi + theta.dot(testBreeze), label)
}.max._2
}

def predictValues[K: ClassTag](testData: RDD[(K, Vector)]): RDD[(K, Double)] = {
// Pair each test data point with all model rows.
val testXModel = testData.mapValues(_.toBreeze).cartesian(model)

// Compute the posterior distribution for every test point.
val posterior = testXModel.map { case ((key, test), (label, pi, theta)) =>
(key, (pi + theta.dot(test), label))
}

// Find the maximum a posteriori value for each test data point, then project labels.
val partitioner = testData.partitioner.getOrElse(defaultPartitioner(posterior))
posterior.reduceByKey(partitioner, Ordering[(Double, Double)].max _).mapValues(_._2)
}
}

/**
Expand All @@ -77,6 +138,8 @@ class NaiveBayesModel private[mllib] (
*/
class NaiveBayes private (private var lambda: Double) extends Serializable with Logging {

private var distMode = "local"

def this() = this(1.0)

/** Set the smoothing parameter. Default: 1.0. */
Expand All @@ -85,6 +148,12 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with
this
}

/** Set the model distribution mode, either "local" or "dist" (for distributed). */
def setDistMode(distMode: String): NaiveBayes = {
this.distMode = distMode
this
}

/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
*
Expand All @@ -103,10 +172,8 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with
}
}

// Aggregates term frequencies per label.
// TODO: Calling combineByKey and collect creates two stages, we can implement something
// TODO: similar to reduceByKeyLocally to save one stage.
val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])](
// Sum the document counts and feature frequencies for each label.
val labelAggregates = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])](
createCombiner = (v: Vector) => {
requireNonnegativeValues(v)
(1L, v.toBreeze.toDenseVector)
Expand All @@ -117,7 +184,20 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with
},
mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) =>
(c1._1 + c2._1, c1._2 += c2._2)
).collect()
)

distMode match {
case "local" => trainLocalModel(labelAggregates)
case "dist" => trainDistModel(labelAggregates)
case _ =>
throw new SparkException(s"Naive Bayes requires a valid distMode but found $distMode.")
}
}

private def trainLocalModel(labelAggregates: RDD[(Double, (Long, BDV[Double]))]) = {
// TODO: Calling combineByKey and collect creates two stages, we can implement something
// TODO: similar to reduceByKeyLocally to save one stage.
val aggregated = labelAggregates.collect()
val numLabels = aggregated.length
var numDocuments = 0L
aggregated.foreach { case (_, (n, _)) =>
Expand All @@ -141,7 +221,28 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with
i += 1
}

new NaiveBayesModel(labels, pi, theta)
new LocalNaiveBayesModel(labels, pi, theta)
}

private def trainDistModel(labelAggregates: RDD[(Double, (Long, BDV[Double]))]) = {
// Compute the model's prior (pi) value and conditional (theta) vector for each label.
// NOTE In contrast to the local trainer, the piLogDenom normalization term is omitted here.
// Computing this term requires an additional aggregation on 'aggregated', and because the
// term is an additive constant it does not affect maximum a posteriori model prediction.
val model = labelAggregates.map { case (label, (numDocuments, sumFeatures)) =>
val pi = math.log(numDocuments + lambda)
val thetaLogDenom = math.log(brzSum(sumFeatures) + sumFeatures.length * lambda)
val theta = new Array[Double](sumFeatures.length)
sumFeatures.iterator.map(f => math.log(f._2 + lambda) - thetaLogDenom).copyToArray(theta)
(label, pi, new BDV[Double](theta))
}

// Materialize and persist the model, check that it is nonempty.
if (model.persist(StorageLevel.MEMORY_AND_DISK).count() == 0) {
throw new SparkException("Naive Bayes requires a nonempty training RDD.")
}

new DistNaiveBayesModel(model)
}
}

Expand Down Expand Up @@ -177,8 +278,9 @@ object NaiveBayes {
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
* @param lambda The smoothing parameter
* @param distMode The model distribution mode, either "local" or "dist" (for distributed)
*/
def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {
new NaiveBayes(lambda).run(input)
def train(input: RDD[LabeledPoint], lambda: Double, distMode: String): NaiveBayesModel = {
new NaiveBayes(lambda).setDistMode(distMode).run(input)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void runUsingStaticMethods() {
int numAccurate1 = validatePrediction(POINTS, model1);
Assert.assertEquals(POINTS.size(), numAccurate1);

NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5);
NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5, "local");
int numAccurate2 = validatePrediction(POINTS, model2);
Assert.assertEquals(POINTS.size(), numAccurate2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.SparkException
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
import org.apache.spark.rdd.RDD

object NaiveBayesSuite {

Expand Down Expand Up @@ -63,12 +66,19 @@ object NaiveBayesSuite {
class NaiveBayesSuite extends FunSuite with LocalSparkContext {

def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOfPredictions = predictions.zip(input).count {
val numOfWrongPredictions = predictions.zip(input).count {
case (prediction, expected) =>
prediction != expected.label
}
// At least 80% of the predictions should be on.
assert(numOfPredictions < input.length / 5)
// At least 80% of the predictions should be correct.
assert(numOfWrongPredictions < input.length / 5)
}

def validatePairPrediction(predictions: RDD[(Long, Double)], input: RDD[(Long, LabeledPoint)]) {
assert(predictions.partitioner == input.partitioner)
assert(predictions.sortByKey().keys.collect().deep == input.sortByKey().keys.collect().deep)
val joined = predictions.join(input).values
validatePrediction(joined.map(_._1).collect(), joined.map(_._2).collect())
}

test("Naive Bayes") {
Expand All @@ -95,6 +105,63 @@ class NaiveBayesSuite extends FunSuite with LocalSparkContext {

// Test prediction on Array.
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)

// Test prediction on PairRDD.
val validationPairRDD = validationRDD.zipWithUniqueId().map(_.swap).partitionBy(
new HashPartitioner(2))
val predicted = model.predictValues(validationPairRDD.mapValues(_.features))
validatePairPrediction(predicted, validationPairRDD)
}

test("distributed naive bayes") {
val nPoints = 10000
val nLabels = 10
val nFeatures = 30

def logNormalize(s: Seq[Int]) = {
s.map(_.toDouble / s.sum).map(math.log)
}

val pi = logNormalize(1 to nLabels).toArray
val theta = (for(l <- 1 to nLabels; f <- 1 to nFeatures)
yield if (f == l) 1000 else 1 // Each label is dominated by a different feature.
).grouped(nFeatures).map(logNormalize).map(_.toArray).toArray

val trainData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42)
val trainRDD = sc.parallelize(trainData, 1)
trainRDD.cache()

val model = NaiveBayes.train(trainRDD, 1.0, "dist")

val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17)
val validationRDD = sc.parallelize(validationData, 2)

// Test prediction on RDD.
validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)

// Test prediction on Array.
val shortValData = validationData.take(nPoints / 100)
validatePrediction(shortValData.map(row => model.predict(row.features)), shortValData)

// Test prediction on PairRDD.
val validationPairRDD = validationRDD.zipWithUniqueId().map(_.swap).partitionBy(
new HashPartitioner(2))
val predicted = model.predictValues(validationPairRDD.mapValues(_.features))
validatePairPrediction(predicted, validationPairRDD)
}

test("distributed naive bayes with empty train RDD") {
val emptyTrainRDD = sc.parallelize(new Array[LabeledPoint](0), 2)
intercept[SparkException] {
NaiveBayes.train(emptyTrainRDD, 1.0, "dist")
}
}

test("distributed naive bayes with empty test RDD") {
val trainRDD = sc.parallelize(LabeledPoint(1.0, Vectors.dense(2.0)) :: Nil, 2)
val model = NaiveBayes.train(trainRDD, 1.0, "dist")
val emptyTestRDD = sc.parallelize(new Array[Vector](0), 2)
assert(model.predict(emptyTestRDD).count == 0)
}

test("detect negative values") {
Expand Down