Skip to content

Commit

Permalink
[SPARK-24555][ML] logNumExamples in KMeans/BiKM/GMM/AFT/NB
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
logNumExamples in KMeans/BiKM/GMM/AFT/NB

## How was this patch tested?
existing tests

Closes apache#21561 from zhengruifeng/alg_logNumExamples.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
zhengruifeng authored and srowen committed Aug 16, 2018
1 parent b3e6fe7 commit e501924
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class LinearSVC @Since("2.2.0") (
(new MultivariateOnlineSummarizer, new MultiClassSummarizer)
)(seqOp, combOp, $(aggregationDepth))
}
instr.logNamedValue(Instrumentation.loggerTags.numExamples, summarizer.count)
instr.logNumExamples(summarizer.count)
instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ class LogisticRegression @Since("1.2.0") (
(new MultivariateOnlineSummarizer, new MultiClassSummarizer)
)(seqOp, combOp, $(aggregationDepth))
}
instr.logNamedValue(Instrumentation.loggerTags.numExamples, summarizer.count)
instr.logNumExamples(summarizer.count)
instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,21 @@ class NaiveBayes @Since("1.5.0") (
// TODO: similar to reduceByKeyLocally to save one stage.
val aggregated = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd
.map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2)))
}.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))(
}.aggregateByKey[(Double, DenseVector, Long)]((0.0, Vectors.zeros(numFeatures).toDense, 0L))(
seqOp = {
case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
case ((weightSum, featureSum, count), (weight, features)) =>
requireValues(features)
BLAS.axpy(weight, features, featureSum)
(weightSum + weight, featureSum)
(weightSum + weight, featureSum, count + 1)
},
combOp = {
case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
case ((weightSum1, featureSum1, count1), (weightSum2, featureSum2, count2)) =>
BLAS.axpy(1.0, featureSum2, featureSum1)
(weightSum1 + weightSum2, featureSum1)
(weightSum1 + weightSum2, featureSum1, count1 + count2)
}).collect().sortBy(_._1)

val numSamples = aggregated.map(_._2._3).sum
instr.logNumExamples(numSamples)
val numLabels = aggregated.length
instr.logNumClasses(numLabels)
val numDocuments = aggregated.map(_._2._1).sum
Expand All @@ -186,7 +188,7 @@ class NaiveBayes @Since("1.5.0") (
val lambda = $(smoothing)
val piLogDenom = math.log(numDocuments + numLabels * lambda)
var i = 0
aggregated.foreach { case (label, (n, sumTermFreqs)) =>
aggregated.foreach { case (label, (n, sumTermFreqs, _)) =>
labelArray(i) = label
piArray(i) = math.log(n + lambda) - piLogDenom
val thetaLogDenom = $(modelType) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,12 @@ class BisectingKMeans @Since("2.0.0") (
.setMinDivisibleClusterSize($(minDivisibleClusterSize))
.setSeed($(seed))
.setDistanceMeasure($(distanceMeasure))
val parentModel = bkm.run(rdd)
val parentModel = bkm.run(rdd, Some(instr))
val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this))
val summary = new BisectingKMeansSummary(
model.transform(dataset), $(predictionCol), $(featuresCol), $(k), $(maxIter))
instr.logNamedValue("clusterSizes", summary.clusterSizes)
instr.logNumFeatures(model.clusterCenters.head.size)
model.setSummary(Some(summary))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ class GaussianMixture @Since("2.0.0") (
bcWeights.destroy(blocking = false)
bcGaussians.destroy(blocking = false)

if (iter == 0) {
val numSamples = sums.count
instr.logNumExamples(numSamples)
}

/*
Create new distributions based on the partial assignments
(often referred to as the "M" step in literature)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
fitIntercept, maxIter, tol, aggregationDepth)
instr.logNamedValue("quantileProbabilities.size", $(quantileProbabilities).length)
instr.logNumFeatures(numFeatures)
instr.logNumExamples(featuresSummarizer.count)

if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresSummarizer.mean(i) != 0.0 }) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.ml.util.Instrumentation
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -151,13 +152,10 @@ class BisectingKMeans private (
this
}

/**
* Runs the bisecting k-means algorithm.
* @param input RDD of vectors
* @return model for the bisecting kmeans
*/
@Since("1.6.0")
def run(input: RDD[Vector]): BisectingKMeansModel = {

private[spark] def run(
input: RDD[Vector],
instr: Option[Instrumentation]): BisectingKMeansModel = {
if (input.getStorageLevel == StorageLevel.NONE) {
logWarning(s"The input RDD ${input.id} is not directly cached, which may hurt performance if"
+ " its parent RDDs are also not cached.")
Expand All @@ -171,6 +169,7 @@ class BisectingKMeans private (
val vectors = input.zip(norms).map { case (x, norm) => new VectorWithNorm(x, norm) }
var assignments = vectors.map(v => (ROOT_INDEX, v))
var activeClusters = summarize(d, assignments, dMeasure)
instr.foreach(_.logNumExamples(activeClusters.values.map(_.size).sum))
val rootSummary = activeClusters(ROOT_INDEX)
val n = rootSummary.size
logInfo(s"Number of points: $n.")
Expand Down Expand Up @@ -246,6 +245,16 @@ class BisectingKMeans private (
new BisectingKMeansModel(root, this.distanceMeasure)
}

/**
* Runs the bisecting k-means algorithm.
* @param input RDD of vectors
* @return model for the bisecting kmeans
*/
@Since("1.6.0")
def run(input: RDD[Vector]): BisectingKMeansModel = {
run(input, None)
}

/**
* Java-friendly version of `run()`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class KMeans private (
val bcCenters = sc.broadcast(centers)

// Find the new centers
val newCenters = data.mapPartitions { points =>
val collected = data.mapPartitions { points =>
val thisCenters = bcCenters.value
val dims = thisCenters.head.vector.size

Expand All @@ -317,7 +317,13 @@ class KMeans private (
}.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
axpy(1.0, sum2, sum1)
(sum1, count1 + count2)
}.collectAsMap().mapValues { case (sum, count) =>
}.collectAsMap()

if (iteration == 0) {
instr.foreach(_.logNumExamples(collected.values.map(_._2).sum))
}

val newCenters = collected.mapValues { case (sum, count) =>
distanceMeasureInstance.centroid(sum, count)
}

Expand Down

0 comments on commit e501924

Please sign in to comment.