Skip to content

[SPARK-17455][MLlib] Improve PAVA implementation in IsotonicRegression #15018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,8 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
* Only univariate (single feature) algorithm supported.
*
* Sequential PAV implementation based on:
* Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
* "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
* Available from <a href="http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf">here</a>
* Grotzinger, S. J., and C. Witzgall.
* "Projections onto order simplexes." Applied mathematics and Optimization 12.1 (1984): 247-270.
*
* Sequential PAV parallelization based on:
* Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
Expand Down Expand Up @@ -312,90 +311,118 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
}

/**
* Performs a pool adjacent violators algorithm (PAV).
* Uses approach with single processing of data where violators
* in previously processed data created by pooling are fixed immediately.
* Uses optimization of discovering monotonicity violating sequences (blocks).
* Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally
* described in [1], using the formulation from [2, 3]. Uses an array to keep track of start
* and end indices of blocks.
*
* @param input Input data of tuples (label, feature, weight).
* [1] Grotzinger, S. J., and C. Witzgall. "Projections onto order simplexes." Applied
* mathematics and Optimization 12.1 (1984): 247-270.
*
* [2] Best, Michael J., and Nilotpal Chakravarti. "Active set algorithms for isotonic
* regression; a unifying framework." Mathematical Programming 47.1-3 (1990): 425-439.
*
* [3] Best, Michael J., Nilotpal Chakravarti, and Vasant A. Ubhaya. "Minimizing separable convex
* functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000):
* 658-672.
*
* @param input Input data of tuples (label, feature, weight). Weights must
be non-negative.
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def poolAdjacentViolators(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

if (input.isEmpty) {
return Array.empty
val cleanInput = input.filter{ case (y, x, weight) =>
require(
weight >= 0.0,
s"Negative weight at point ($y, $x, $weight). Weights must be non-negative"
)
weight > 0
}

// Pools sub array within given bounds assigning weighted average value to all elements.
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
val poolSubArray = input.slice(start, end + 1)
if (cleanInput.isEmpty) {
return Array.empty
}

val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
val weight = poolSubArray.map(_._3).sum
// Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from
// cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i
// Initially, each data point is its own block.
val blockBounds = Array.range(0, cleanInput.length)

var i = start
while (i <= end) {
input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
i = i + 1
}
// Keep track of the sum of weights and sum of weight * y for each block. weights(start)
// gives the values for the block. Entries that are not at the start of a block
// are meaningless.
val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) =>
(weight, weight * y)
}

var i = 0
val len = input.length
while (i < len) {
var j = i
// a few convenience functions to make the code more readable

// Find monotonicity violating sequence, if any.
while (j < len - 1 && input(j)._1 > input(j + 1)._1) {
j = j + 1
}
// blockStart and blockEnd have identical implementations. We create two different
// functions to make the code more expressive
def blockEnd(start: Int): Int = blockBounds(start)
def blockStart(end: Int): Int = blockBounds(end)

// If monotonicity was not violated, move to next data point.
if (i == j) {
i = i + 1
} else {
// Otherwise pool the violating sequence
// and check if pooling caused monotonicity violation in previously processed points.
while (i >= 0 && input(i)._1 > input(i + 1)._1) {
pool(input, i, j)
i = i - 1
}
// the next block starts at the index after the end of this block
def nextBlock(start: Int): Int = blockEnd(start) + 1

i = j
}
// the previous block ends at the index before the start of this block
// we then use blockStart to find the start
def prevBlock(start: Int): Int = blockStart(start - 1)

// Merge two adjacent blocks, updating blockBounds and weights to reflect the merge
// Return the start index of the merged block
def merge(block1: Int, block2: Int): Int = {
assert(
blockEnd(block1) + 1 == block2,
s"Attempting to merge non-consecutive blocks [${block1}, ${blockEnd(block1)}]" +
s" and [${block2}, ${blockEnd(block2)}]. This is likely a bug in the isotonic regression" +
" implementation. Please file a bug report."
)
blockBounds(block1) = blockEnd(block2)
blockBounds(blockEnd(block2)) = block1
val w1 = weights(block1)
val w2 = weights(block2)
weights(block1) = (w1._1 + w2._1, w1._2 + w2._2)
block1
}

// For points having the same prediction, we only keep two boundary points.
val compressed = ArrayBuffer.empty[(Double, Double, Double)]
// average value of a block
def average(start: Int): Double = weights(start)._2 / weights(start)._1

var (curLabel, curFeature, curWeight) = input.head
var rightBound = curFeature
def merge(): Unit = {
compressed += ((curLabel, curFeature, curWeight))
if (rightBound > curFeature) {
compressed += ((curLabel, rightBound, 0.0))
// Implement Algorithm PAV from [3].
// Merge on >= instead of > because it eliminates adjacent blocks with the same average, and we
// want to compress our output as much as possible. Both give correct results.
var i = 0
while (nextBlock(i) < cleanInput.length) {
if (average(i) >= average(nextBlock(i))) {
merge(i, nextBlock(i))
while((i > 0) && (average(prevBlock(i)) >= average(i))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: space before while here and below. You can probably nix the extra parens around each of the two terms, but no big deal

i = merge(prevBlock(i), i)
}
} else {
i = nextBlock(i)
}
}
i = 1
while (i < input.length) {
val (label, feature, weight) = input(i)
if (label == curLabel) {
curWeight += weight
rightBound = feature

// construct the output by walking through the blocks in order
val output = ArrayBuffer.empty[(Double, Double, Double)]
i = 0
while (i < cleanInput.length) {
// If block size is > 1, a point at the start and end of the block,
// each receiving half the weight. Otherwise, a single point with
// all the weight.
if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) {
output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2))
output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2))
} else {
merge()
curLabel = label
curFeature = feature
curWeight = weight
rightBound = curFeature
output += ((average(i), cleanInput(i)._2, weights(i)._1))
}
i += 1
i = nextBlock(i)
}
merge()

compressed.toArray
output.toArray
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.regression

import org.scalatest.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -163,17 +163,16 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
}

test("weighted isotonic regression with negative weights") {
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)

assert(model.boundaries === Array(0.0, 1.0, 4.0))
assert(model.predictions === Array(1.0, 10.0/6, 10.0/6))
val ex = intercept[SparkException] {
runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
}
assert(ex.getCause.isInstanceOf[IllegalArgumentException])
}

test("weighted isotonic regression with zero weights") {
val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true)

assert(model.boundaries === Array(0.0, 1.0, 4.0))
assert(model.predictions === Array(1, 2, 2))
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1, 0), Seq(0, 0, 0, 1, 1, 0), true)
assert(model.boundaries === Array(3, 4))
assert(model.predictions === Array(1.5, 1.5))
}

test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") {
Expand Down