Skip to content

[SPARK-6267] [MLLIB] Python API for IsotonicRegression #5890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,24 @@ private[python] class PythonMLLibAPI extends Serializable {
map(_.asInstanceOf[Object]).asJava
}

/**
* Java stub for Python mllib IsotonicRegression.run()
*/
def trainIsotonicRegressionModel(
data: JavaRDD[Vector],
isotonic: Boolean): JList[Object] = {
val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic)
val input = data.rdd.map { x =>
(x(0), x(1), x(2))
}.persist(StorageLevel.MEMORY_AND_DISK)
try {
val model = isotonicRegressionAlg.run(input)
List[AnyRef](model.boundaryVector, model.predictionVector).asJava
} finally {
data.rdd.unpersist(blocking = false)
}
}

/**
* Java stub for Python mllib KMeans.run()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import java.io.Serializable
import java.lang.{Double => JDouble}
import java.util.Arrays.binarySearch

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.SQLContext

/**
* :: Experimental ::
Expand All @@ -57,6 +59,13 @@ class IsotonicRegressionModel (
assertOrdered(boundaries)
assertOrdered(predictions)(predictionOrd)

/** A Java-friendly constructor that takes two Iterable parameters and one Boolean parameter. */
def this(boundaries: java.lang.Iterable[Double],
predictions: java.lang.Iterable[Double],
isotonic: java.lang.Boolean) = {
this(boundaries.asScala.toArray, predictions.asScala.toArray, isotonic)
}

/** Asserts the input array is monotone with the given ordering. */
private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
var i = 1
Expand Down Expand Up @@ -132,6 +141,12 @@ class IsotonicRegressionModel (
}
}

/** A convenient method for boundaries called by the Python API. */
private[mllib] def boundaryVector: Vector = Vectors.dense(boundaries)

/** A convenient method for boundaries called by the Python API. */
private[mllib] def predictionVector: Vector = Vectors.dense(predictions)

override def save(sc: SparkContext, path: String): Unit = {
IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic)
}
Expand Down
73 changes: 71 additions & 2 deletions python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
import numpy as np
from numpy import array

from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
from pyspark.mllib.util import Saveable, Loader

__all__ = ['LabeledPoint', 'LinearModel',
'LinearRegressionModel', 'LinearRegressionWithSGD',
'RidgeRegressionModel', 'RidgeRegressionWithSGD',
'LassoModel', 'LassoWithSGD']
'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel',
'IsotonicRegression']


class LabeledPoint(object):
Expand Down Expand Up @@ -396,6 +398,73 @@ def train(rdd, i):
return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)


class IsotonicRegressionModel(Saveable, Loader):

"""Regression model for isotonic regression.

>>> data = [(1, 0, 1), (2, 1, 1), (3, 2, 1), (1, 3, 1), (6, 4, 1), (17, 5, 1), (16, 6, 1)]
>>> irm = IsotonicRegression.train(sc.parallelize(data))
>>> irm.predict(3)
2.0
>>> irm.predict(5)
16.5
>>> irm.predict(sc.parallelize([3, 5])).collect()
[2.0, 16.5]
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> irm.save(sc, path)
>>> sameModel = IsotonicRegressionModel.load(sc, path)
>>> sameModel.predict(3)
2.0
>>> sameModel.predict(5)
16.5
>>> try:
... os.removedirs(path)
... except OSError:
... pass
"""

def __init__(self, boundaries, predictions, isotonic):
self.boundaries = boundaries
self.predictions = predictions
self.isotonic = isotonic

def predict(self, x):
if isinstance(x, RDD):
return x.map(lambda v: self.predict(v))
return np.interp(x, self.boundaries, self.predictions)

def save(self, sc, path):
java_boundaries = _py2java(sc, self.boundaries.tolist())
java_predictions = _py2java(sc, self.predictions.tolist())
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel(
java_boundaries, java_predictions, self.isotonic)
java_model.save(sc._jsc.sc(), path)

@classmethod
def load(cls, sc, path):
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
sc._jsc.sc(), path)
py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray()
py_predictions = _java2py(sc, java_model.predictionVector()).toArray()
return IsotonicRegressionModel(py_boundaries, py_predictions, java_model.isotonic)


class IsotonicRegression(object):
"""
Run IsotonicRegression algorithm to obtain isotonic regression model.

:param data: RDD of (label, feature, weight) tuples.
:param isotonic: Whether this is isotonic or antitonic.
"""
@classmethod
def train(cls, data, isotonic=True):
"""Train a isotonic regression model on the given data."""
boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel",
data.map(_convert_to_vector), bool(isotonic))
return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)


def _test():
import doctest
from pyspark import SparkContext
Expand Down