Skip to content

[SPARK-3181][MLLIB]: Add Robust Regression Algorithm with Huber Estimator #2096

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.mllib

import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.regression.HuberRobustRegressionWithLBFGS
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater}

/**
* An example app for Huber Robust regression. Run with
* {{{
* bin/run-example org.apache.spark.examples.mllib.HuberRobustRegression
* }}}
* A synthetic dataset can be found at `data/mllib/sample_linear_regression_data.txt`.
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object HuberRobustRegression extends App {

object RegType extends Enumeration {
type RegType = Value
val NONE, L1, L2 = Value
}

import RegType._

case class Params(
input: String = null,
numIterations: Int = 100,
stepSize: Double = 1.0,
regType: RegType = L2,
regParam: Double = 0.1)

val defaultParams = Params()

val parser = new OptionParser[Params]("HuberRobustRegression") {
head("HuberRobustRegression: an example app for Huber M-Estimation Robust regression.")
opt[Int]("numIterations")
.text("number of iterations")
.action((x, c) => c.copy(numIterations = x))
opt[Double]("stepSize")
.text(s"initial step size, default: ${defaultParams.stepSize}")
.action((x, c) => c.copy(stepSize = x))
opt[String]("regType")
.text(s"regularization type (${RegType.values.mkString(",")}), " +
s"default: ${defaultParams.regType}")
.action((x, c) => c.copy(regType = RegType.withName(x)))
opt[Double]("regParam")
.text(s"regularization parameter, default: ${defaultParams.regParam}")
arg[String]("<input>")
.required()
.text("input paths to labeled examples in LIBSVM format")
.action((x, c) => c.copy(input = x))
note(
"""
|For example, the following command runs this app on a synthetic dataset:
|
| bin/spark-submit --class org.apache.spark.examples.mllib.HuberRobustRegression \
| examples/target/scala-*/spark-examples-*.jar \
| data/mllib/sample_linear_regression_data.txt
""".stripMargin)
}

parser.parse(args, defaultParams).map { params =>
run(params)
} getOrElse {
sys.exit(1)
}

// If no parameter set SimpleUpdater will be used.
def run(params: Params) {
val conf = new SparkConf().setAppName(s"HuberRobustRegression with $params")
val sc = new SparkContext(conf)

Logger.getRootLogger.setLevel(Level.WARN)

val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()

val splits = examples.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
val test = splits(1).cache()

val numTraining = training.count()
val numTest = test.count()
println(s"Training: $numTraining, test: $numTest.")

examples.unpersist(blocking = false)

val updater = params.regType match {
case NONE => new SimpleUpdater()
case L1 => new L1Updater()
case L2 => new SquaredL2Updater()
}

val algorithm = new HuberRobustRegressionWithLBFGS()
algorithm.optimizer
.setNumIterations(params.numIterations)
.setUpdater(updater)
.setRegParam(params.regParam)

val model = algorithm.run(training)

val prediction = model.predict(test.map(_.features))
val predictionAndLabel = prediction.zip(test.map(_.label))

val loss = predictionAndLabel.map { case (p, l) =>
val err = p - l
err * err
}.reduce(_ + _)
val rmse = math.sqrt(loss / numTest)

println(s"Test RMSE = $rmse.")

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.mllib.optimization
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, scal}
import scala.math.pow

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -118,6 +119,69 @@ class LeastSquaresGradient extends Gradient {
}
}

/**
* :: DeveloperApi ::
* Compute gradient and loss for Huber objective function, as used in robust regression.
* The Huber M-estimator corresponds to a probability distribution for the errors which is normal
* in the centre but like a double exponential distribution in the tails (Hogg 1979: 109).
* L = 1/2 ||A weights-y||^2 if |A weights-y| <= k
* L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k
* where k = 1.345 which produce 95% efficiency when the errors are normal and
* substantial resistance to outliers otherwise.
* See also the documentation for the precise formulation.
*/
@DeveloperApi
class HuberRobustGradient extends Gradient {
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val diff = dot(data, weights) - label
val loss = diff * diff
val gradient = data.copy
val k = 1.345

/**
* Tuning constant is generally picked to give reasonably high efficiency in the normal case.
* Smaller values produce more resistance to outliers while at the expense of
* lower efficiency when the errors are normally distributed.
*/

if(diff < -k){
scal(-k, gradient)
(gradient, (-k * diff - 1.0 / 2.0 * pow(k, 2)))
}else if(diff >= -k && diff <= k){
scal(diff, gradient)
(gradient, (1.0 / 2.0 * loss))
}else {
scal(k, gradient)
(gradient, (k * diff - 1.0 / 2.0 * pow(k, 2)))
}
}

override def compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
val diff = dot(data, weights) - label
val loss = diff * diff
val k = 1.345

if(diff < -k){
axpy(-k, data, cumGradient)
}else if(diff >= -k && diff <= k){
axpy(diff, data, cumGradient)
}else {
axpy(k, data, cumGradient)
}
if(diff < -k){
-k * diff - 1.0 / 2.0 * pow(k, 2)
}else if(diff >= -k && diff <= k){
1.0 / 2.0 * loss
}else {
k * diff - 1.0 /2.0 * pow(k, 2)
}
}
}

/**
* :: DeveloperApi ::
* Compute gradient and loss for a Hinge loss function, as used in SVM binary classification.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.annotation.Experimental

/**
* Regression model trained using Huber M-Estimation RobustRegression.
*
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
class HuberRobustRegressionModel (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

override protected def predictPoint(
dataMatrix: Vector,
weightMatrix: Vector,
intercept: Double): Double = {
weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
}
}

/**
* Train a Huber Robust regression model with no regularization using Stochastic Gradient Descent.
* This solves the Huber objective function
* f(weights) = 1/2 ||A weights-y||^2 if |A weights-y| <= k
* f(weights) = k |A weights-y| - 1/2 K^2 if |A weights-y| > k
* where k = 1.345 which produce 95% efficiency when the errors are normal and
* substantial resistance to outliers otherwise.
* Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with
* its corresponding right hand side label y.
* See also the documentation for the precise formulation.
*/
class HuberRobustRegressionWithSGD private[mllib] (
private var stepSize: Double,
private var numIterations: Int,
private var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[HuberRobustRegressionModel] with Serializable {

private val gradient = new HuberRobustGradient()
private val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setMiniBatchFraction(miniBatchFraction)

/**
* Construct a Huber M-Estimation RobustRegression object with default parameters: {stepSize: 1.0,
* numIterations: 100, miniBatchFraction: 1.0}.
*/
def this() = this(1.0, 100, 1.0)

override protected def createModel(weights: Vector, intercept: Double) = {
new HuberRobustRegressionModel(weights, intercept)
}
}

/**
* Top-level methods for calling HuberRobustRegression.
*/
object HuberRobustRegressionWithSGD {

/**
* Train a HuberRobust Regression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used
* in gradient descent are initialized using the initial weights provided.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param miniBatchFraction Fraction of data to be used per iteration.
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Vector): HuberRobustRegressionModel = {
new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
.run(input, initialWeights)
}

/**
* Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double): HuberRobustRegressionModel = {
new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
}

/**
* Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using the specified step size. We use the entire
* data set to compute the true gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param stepSize Step size to be used for each iteration of Gradient Descent.
* @param numIterations Number of iterations of gradient descent to run.
* @return a HuberRobustRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double): HuberRobustRegressionModel = {
train(input, numIterations, stepSize, 1.0)
}

/**
* Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using a step size of 1.0. We use the entire data
* set to compute the true gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations Number of iterations of gradient descent to run.
* @return a HuberRobustRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int): HuberRobustRegressionModel = {
train(input, numIterations, 1.0, 1.0)
}
}

/**
* Train a classification model for HuberRobust Regression using Limited-memory BFGS.
* Standard feature scaling and L2 regularization are used by default.
*/
class HuberRobustRegressionWithLBFGS
extends GeneralizedLinearAlgorithm[HuberRobustRegressionModel] with Serializable {

this.setFeatureScaling(true)

override val optimizer = new LBFGS(new HuberRobustGradient, new SquaredL2Updater)

override protected def createModel(weights: Vector, intercept: Double) = {
new HuberRobustRegressionModel(weights, intercept)
}
}
Loading