Skip to content

[SPARK-6793][SPARK-5567][SPARK-8936][MLlib]Perplexity, prediction, and hyperparameter optimization in OnlineLDAOptimizer #7507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 195 additions & 25 deletions mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseMatrix => BDM, normalize, sum => brzSum, DenseVector => BDV}
import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, sum, normalize}
import breeze.numerics.{lgamma, digamma, exp}

import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -53,6 +54,34 @@ abstract class LDAModel private[clustering] extends Saveable {
/** Vocabulary size (number of terms or terms in the vocabulary) */
def vocabSize: Int

/** Dirichlet parameters for document-topic distribution. */
protected def docConcentration: Vector

/**
* Concentration parameter (commonly named "alpha") for the prior placed on documents'
* distributions over topics ("theta").
*
* This is the parameter to a Dirichlet distribution.
*/
def getDocConcentration: Vector = this.docConcentration

/** Dirichlet parameters for topic-word distribution. */
protected def topicConcentration: Double

/**
* Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics'
* distributions over terms.
*
* This is the parameter to a symmetric Dirichlet distribution.
*
* Note: The topics' distributions over terms are called "beta" in the original LDA paper
* by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009.
*/
def getTopicConcentration: Double = this.topicConcentration

/** Shape parameter for random initialization of gamma. */
protected def gammaShape: Double

/**
* Inferred topics, where each topic is represented by a distribution over terms.
* This is a matrix of size vocabSize x k, where each column is a topic.
Expand Down Expand Up @@ -168,7 +197,10 @@ abstract class LDAModel private[clustering] extends Saveable {
*/
@Experimental
class LocalLDAModel private[clustering] (
private val topics: Matrix) extends LDAModel with Serializable {
private val topics: Matrix,
protected val docConcentration: Vector,
protected val topicConcentration: Double,
protected val gammaShape: Double) extends LDAModel with Serializable {

override def k: Int = topics.numCols

Expand All @@ -186,17 +218,124 @@ class LocalLDAModel private[clustering] (
}.toArray
}

/**
* Backwards compatibility constructor, assumes symmetric docConcentration=1/numTopics,
* topicConcentration=1, gammaShape=100. This is probably NOT what you want (instead you should
* set the parameters explicitly in constructor)
* @deprecated Provide alpha, eta, and gammaShape in constructor.
*/
@deprecated(
"Provide docConcentration, topicConcentration, and gammaShape in constructor", "1.5.0")
def this(topics: Matrix) {
this(topics, Vectors.dense(Array.fill(topics.numRows)(1.0 / topics.numRows)), 1D, 100D)
}

override protected def formatVersion = "1.0"

override def save(sc: SparkContext, path: String): Unit = {
LocalLDAModel.SaveLoadV1_0.save(sc, path, topicsMatrix)
}

// TODO
// override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???

// TODO:
// override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ???

/**
* Calculate and return per-word likelihood bound, using the `batch` of
* documents as evaluation corpus.
*/
// TODO: calcualte logPerplexity over training set online during training, reusing gammad instead
// of performing variational inference again in [[bound()]]
def logPerplexity(
batch: RDD[(Long, Vector)],
totalDocs: Long): Double = {
val corpusWords = batch
.flatMap { case (_, termCounts) => termCounts.toArray }
.reduce(_ + _)
val subsampleRatio = totalDocs.toDouble / batch.count()
val batchVariationalBound = bound(
batch,
subsampleRatio,
docConcentration,
topicConcentration,
topicsMatrix.toBreeze.toDenseMatrix,
gammaShape,
k,
vocabSize)
val perWordBound = batchVariationalBound / (subsampleRatio * corpusWords)

perWordBound
}


/**
* Estimate the variational bound of documents from `batch`:
* E_q[log p(bath)] - E_q[log q(batch)]
*/
private def bound(
batch: RDD[(Long, Vector)],
subsampleRatio: Double,
alpha: Vector,
eta: Double,
lambda: BDM[Double],
gammaShape: Double,
k: Int,
vocabSize: Long): Double = {
val brzAlpha = alpha.toBreeze.toDenseVector
// transpose because dirichletExpectation normalizes by row and we need to normalize
// by topic (columns of lambda)
val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t

var score = batch.map { case (id: Long, termCounts: Vector) =>
var docScore = 0.0D
val (gammad: BDV[Double], _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts, exp(Elogbeta), brzAlpha, gammaShape, k)
val Elogthetad: BDV[Double] = LDAUtils.dirichletExpectation(gammad)

// E[log p(doc | theta, beta)]
termCounts.foreachActive { case (id, count) =>
docScore += LDAUtils.logSumExp(Elogthetad + Elogbeta(id, ::).t)
}
// E[log p(theta | alpha) - log q(theta | gamma)]; assumes alpha is a vector
docScore += sum((brzAlpha - gammad) :* Elogthetad)
docScore += sum(lgamma(gammad) - lgamma(brzAlpha))
docScore += lgamma(sum(brzAlpha)) - lgamma(sum(gammad))

docScore
}.sum()

// compensate likelihood for when `corpus` above is only a sample of the whole corpus
score *= subsampleRatio

// E[log p(beta | eta) - log q (beta | lambda)]; assumes eta is a scalar
score += sum((eta - lambda) :* Elogbeta)
score += sum(lgamma(lambda) - lgamma(eta))

val sumEta = eta * vocabSize
score += sum(lgamma(sumEta) - lgamma(sum(lambda(::, breeze.linalg.*))))

score
}

/**
* Predicts the topic mixture distribution gamma for a document.
*/
def topicDistribution(doc: (Long, Vector)): (Long, Vector) = {
// Double transpose because dirichletExpectation normalizes by row and we need to normalize
// by topic (columns of lambda)
val Elogbeta = LDAUtils.dirichletExpectation(topicsMatrix.toBreeze.toDenseMatrix.t).t

val (gamma, _) = OnlineLDAOptimizer.variationalTopicInference(
doc._2,
exp(Elogbeta),
this.docConcentration,
this.gammaShape,
this.k)
(doc._1, Vectors.dense((gamma / sum(gamma)).toArray))
}

}

@Experimental
Expand All @@ -212,6 +351,8 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
// as a Row in data.
case class Data(topic: Vector, index: Int)

// TODO: explicitly save docConcentration, topicConcentration, and gammaShape for use in
// model.predict()
def save(sc: SparkContext, path: String, topicsMatrix: Matrix): Unit = {
val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
Expand Down Expand Up @@ -287,23 +428,38 @@ class DistributedLDAModel private (
private val globalTopicTotals: LDA.TopicCounts,
val k: Int,
val vocabSize: Int,
private val docConcentration: Double,
private val topicConcentration: Double,
protected val docConcentration: Vector,
protected val topicConcentration: Double,
protected val gammaShape: Double,
private[spark] val iterationTimes: Array[Double]) extends LDAModel {

import LDA._

private[clustering] def this(state: EMLDAOptimizer, iterationTimes: Array[Double]) = {
this(state.graph, state.globalTopicTotals, state.k, state.vocabSize, state.docConcentration,
state.topicConcentration, iterationTimes)
private[clustering] def this(
state: EMLDAOptimizer,
iterationTimes: Array[Double],
gammaShape: Double) = {
this(
state.graph,
state.globalTopicTotals,
state.k,
state.vocabSize,
Vectors.dense(Array.fill(state.k)(state.docConcentration)),
state.topicConcentration,
gammaShape,
iterationTimes)
}

/**
* Convert model to a local model.
* The local model stores the inferred topics but not the topic distributions for training
* documents.
*/
def toLocal: LocalLDAModel = new LocalLDAModel(topicsMatrix)
def toLocal: LocalLDAModel = new LocalLDAModel(
topicsMatrix,
docConcentration,
topicConcentration,
gammaShape)

/**
* Inferred topics, where each topic is represented by a distribution over terms.
Expand Down Expand Up @@ -375,8 +531,9 @@ class DistributedLDAModel private (
* hyperparameters.
*/
lazy val logLikelihood: Double = {
val eta = topicConcentration
val alpha = docConcentration
// TODO: generalize this for asymmetric (non-scalar) alpha
val alpha = this.docConcentration(0) // To avoid closure capture of enclosing object
val eta = this.topicConcentration
assert(eta > 1.0)
assert(alpha > 1.0)
val N_k = globalTopicTotals
Expand All @@ -400,8 +557,9 @@ class DistributedLDAModel private (
* log P(topics, topic distributions for docs | alpha, eta)
*/
lazy val logPrior: Double = {
val eta = topicConcentration
val alpha = docConcentration
// TODO: generalize this for asymmetric (non-scalar) alpha
val alpha = this.docConcentration(0) // To avoid closure capture of enclosing object
val eta = this.topicConcentration
// Term vertices: Compute phi_{wk}. Use to compute prior log probability.
// Doc vertex: Compute theta_{kj}. Use to compute prior log probability.
val N_k = globalTopicTotals
Expand All @@ -412,12 +570,12 @@ class DistributedLDAModel private (
val N_wk = vertex._2
val smoothed_N_wk: TopicCounts = N_wk + (eta - 1.0)
val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k
(eta - 1.0) * brzSum(phi_wk.map(math.log))
(eta - 1.0) * sum(phi_wk.map(math.log))
} else {
val N_kj = vertex._2
val smoothed_N_kj: TopicCounts = N_kj + (alpha - 1.0)
val theta_kj: TopicCounts = normalize(smoothed_N_kj, 1.0)
(alpha - 1.0) * brzSum(theta_kj.map(math.log))
(alpha - 1.0) * sum(theta_kj.map(math.log))
}
}
graph.vertices.aggregate(0.0)(seqOp, _ + _)
Expand Down Expand Up @@ -448,7 +606,7 @@ class DistributedLDAModel private (
override def save(sc: SparkContext, path: String): Unit = {
DistributedLDAModel.SaveLoadV1_0.save(
sc, path, graph, globalTopicTotals, k, vocabSize, docConcentration, topicConcentration,
iterationTimes)
iterationTimes, gammaShape)
}
}

Expand Down Expand Up @@ -478,17 +636,20 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
globalTopicTotals: LDA.TopicCounts,
k: Int,
vocabSize: Int,
docConcentration: Double,
docConcentration: Vector,
topicConcentration: Double,
iterationTimes: Array[Double]): Unit = {
iterationTimes: Array[Double],
gammaShape: Double): Unit = {
val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._

val metadata = compact(render
(("class" -> classNameV1_0) ~ ("version" -> thisFormatVersion) ~
("k" -> k) ~ ("vocabSize" -> vocabSize) ~ ("docConcentration" -> docConcentration) ~
("topicConcentration" -> topicConcentration) ~
("iterationTimes" -> iterationTimes.toSeq)))
("k" -> k) ~ ("vocabSize" -> vocabSize) ~
("docConcentration" -> docConcentration.toArray.toSeq) ~
("topicConcentration" -> topicConcentration) ~
("iterationTimes" -> iterationTimes.toSeq) ~
("gammaShape" -> gammaShape)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))

val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString
Expand All @@ -510,9 +671,10 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
sc: SparkContext,
path: String,
vocabSize: Int,
docConcentration: Double,
docConcentration: Vector,
topicConcentration: Double,
iterationTimes: Array[Double]): DistributedLDAModel = {
iterationTimes: Array[Double],
gammaShape: Double): DistributedLDAModel = {
val dataPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString
val vertexDataPath = new Path(Loader.dataPath(path), "topicCounts").toUri.toString
val edgeDataPath = new Path(Loader.dataPath(path), "tokenCounts").toUri.toString
Expand All @@ -536,7 +698,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
val graph: Graph[LDA.TopicCounts, LDA.TokenCount] = Graph(vertices, edges)

new DistributedLDAModel(graph, globalTopicTotals, globalTopicTotals.length, vocabSize,
docConcentration, topicConcentration, iterationTimes)
docConcentration, topicConcentration, gammaShape, iterationTimes)
}

}
Expand All @@ -546,15 +708,23 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
implicit val formats = DefaultFormats
val expectedK = (metadata \ "k").extract[Int]
val vocabSize = (metadata \ "vocabSize").extract[Int]
val docConcentration = (metadata \ "docConcentration").extract[Double]
val docConcentration =
Vectors.dense((metadata \ "docConcentration").extract[Seq[Double]].toArray)
val topicConcentration = (metadata \ "topicConcentration").extract[Double]
val iterationTimes = (metadata \ "iterationTimes").extract[Seq[Double]]
val gammaShape = (metadata \ "gammaShape").extract[Double]
val classNameV1_0 = SaveLoadV1_0.classNameV1_0

val model = (loadedClassName, loadedVersion) match {
case (className, "1.0") if className == classNameV1_0 => {
DistributedLDAModel.SaveLoadV1_0.load(
sc, path, vocabSize, docConcentration, topicConcentration, iterationTimes.toArray)
sc,
path,
vocabSize,
docConcentration,
topicConcentration,
iterationTimes.toArray,
gammaShape)
}
case _ => throw new Exception(
s"DistributedLDAModel.load did not recognize model with (className, format version):" +
Expand Down
Loading