Skip to content

Commit 3de71d0

Browse files
SPARK-3278 added initial version of Isotonic regression algorithm including proposed API
1 parent c6e0c2a commit 3de71d0

File tree

2 files changed

+388
-0
lines changed

2 files changed

+388
-0
lines changed
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.regression
19+
20+
import org.apache.spark.mllib.linalg.Vector
21+
import org.apache.spark.rdd.RDD
22+
23+
sealed trait MonotonicityConstraint {
24+
def holds(current: LabeledPoint, next: LabeledPoint): Boolean
25+
}
26+
27+
case object Isotonic extends MonotonicityConstraint {
28+
override def holds(current: LabeledPoint, next: LabeledPoint): Boolean = {
29+
current.label <= next.label
30+
}
31+
}
32+
case object Antitonic extends MonotonicityConstraint {
33+
override def holds(current: LabeledPoint, next: LabeledPoint): Boolean = {
34+
current.label >= next.label
35+
}
36+
}
37+
38+
/**
39+
* Regression model for Isotonic regression
40+
*
41+
* @param predictions Weights computed for every feature.
42+
*/
43+
class IsotonicRegressionModel(
44+
val predictions: Seq[LabeledPoint],
45+
val monotonicityConstraint: MonotonicityConstraint)
46+
extends RegressionModel {
47+
override def predict(testData: RDD[Vector]): RDD[Double] =
48+
testData.map(predict)
49+
50+
//take the highest of elements smaller than our feature or weight with lowest feature
51+
override def predict(testData: Vector): Double =
52+
(predictions.head +: predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label
53+
}
54+
55+
trait IsotonicRegressionAlgorithm
56+
extends Serializable {
57+
58+
protected def createModel(weights: Seq[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
59+
def run(input: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
60+
def run(input: RDD[LabeledPoint], initialWeights: Vector, monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
61+
}
62+
63+
class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {
64+
65+
override def run(input: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel =
66+
createModel(parallelPoolAdjacentViolators(input, monotonicityConstraint), monotonicityConstraint)
67+
68+
override def run(input: RDD[LabeledPoint], initialWeights: Vector, monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = ???
69+
70+
override protected def createModel(weights: Seq[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel =
71+
new IsotonicRegressionModel(weights, monotonicityConstraint)
72+
73+
//Performs a pool adjacent violators algorithm (PAVA)
74+
//Uses approach with single processing of data where violators in previously processed
75+
//data created by pooling are fixed immediatelly.
76+
//Uses optimization of discovering monotonicity violating sequences
77+
//Method in situ mutates input array
78+
private def poolAdjacentViolators(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {
79+
80+
//Pools sub array within given bounds assigning weighted average value to all elements
81+
def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = {
82+
val poolSubArray = in.slice(start, end + 1)
83+
84+
val weightedSum = poolSubArray.map(_.label).sum
85+
val weight = poolSubArray.length
86+
87+
for(i <- start to end) {
88+
in(i) = LabeledPoint(weightedSum / weight, in(i).features)
89+
}
90+
}
91+
92+
var i = 0
93+
94+
while(i < in.length) {
95+
var j = i
96+
97+
//find monotonicity violating sequence, if any
98+
while(j < in.length - 1 && !monotonicityConstraint.holds(in(j), in(j + 1))) {
99+
j = j + 1
100+
}
101+
102+
//if monotonicity was not violated, move to next data point
103+
if(i == j) {
104+
i = i + 1
105+
} else {
106+
//otherwise pool the violating sequence
107+
//and check if pooling caused monotonicity violation in previously processed points
108+
while (i >= 0 && !monotonicityConstraint.holds(in(i), in(i + 1))) {
109+
pool(in, i, j)
110+
i = i - 1
111+
}
112+
113+
i = j
114+
}
115+
}
116+
117+
in
118+
}
119+
120+
private def parallelPoolAdjacentViolators(testData: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Seq[LabeledPoint] = {
121+
poolAdjacentViolators(
122+
testData
123+
.sortBy(_.features.toArray.head)
124+
.mapPartitions(it => poolAdjacentViolators(it.toArray, monotonicityConstraint).toIterator)
125+
.collect(), monotonicityConstraint)
126+
}
127+
}
128+
129+
/**
130+
* Top-level methods for calling IsotonicRegression.
131+
*/
132+
object IsotonicRegression {
133+
134+
/**
135+
* Train a Linear Regression model given an RDD of (label, features) pairs. We run a fixed number
136+
* of iterations of gradient descent using the specified step size. Each iteration uses
137+
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used
138+
* in gradient descent are initialized using the initial weights provided.
139+
*
140+
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
141+
* matrix A as well as the corresponding right hand side label y
142+
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
143+
* the number of features in the data.
144+
*/
145+
def train(
146+
input: RDD[LabeledPoint],
147+
initialWeights: Vector,
148+
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
149+
new PoolAdjacentViolators().run(input, initialWeights, monotonicityConstraint)
150+
}
151+
152+
/**
153+
* Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
154+
* of iterations of gradient descent using the specified step size. Each iteration uses
155+
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient.
156+
*
157+
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
158+
* matrix A as well as the corresponding right hand side label y
159+
*/
160+
def train(
161+
input: RDD[LabeledPoint],
162+
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
163+
new PoolAdjacentViolators().run(input, monotonicityConstraint)
164+
}
165+
}
166+
167+
/*def functionalOption(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {
168+
def pool2(in: Array[LabeledPoint]): Array[LabeledPoint] =
169+
in.map(p => LabeledPoint(in.map(_.label).sum / in.length, p.features))
170+
171+
def iterate(checked: Array[LabeledPoint], remaining: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {
172+
if(remaining.size < 2) {
173+
checked ++ remaining
174+
} else {
175+
val newRemaining = if(remaining.size == 2) Array[LabeledPoint]() else remaining.slice(2, remaining.length)
176+
177+
if(!monotonicityConstraint.holds(remaining.head, remaining.tail.head)) {
178+
iterate(checked ++ pool2(remaining.slice(0, 2)), newRemaining, monotonicityConstraint)
179+
} else {
180+
iterate(checked ++ remaining.slice(0, 2), newRemaining, monotonicityConstraint)
181+
}
182+
}
183+
}
184+
185+
iterate(Array(), in, monotonicityConstraint)
186+
}
187+
188+
189+
functionalOption(in, monotonicityConstraint)*/
190+
191+
/*def option1(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint) = {
192+
def findMonotonicityViolators(in: Array[LabeledPoint], start: Int, monotonicityConstraint: MonotonicityConstraint): Unit = {
193+
var j = start
194+
195+
while (j >= 1 && !monotonicityConstraint.holds(in(j - 1), in(j))) {
196+
pool(in, j - 1, start + 1)
197+
j = j - 1
198+
}
199+
}
200+
201+
for (i <- 0 to in.length - 1) {
202+
findMonotonicityViolators(in, i, monotonicityConstraint)
203+
}
204+
205+
in
206+
}*/
207+
208+
/*
209+
def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = {
210+
val subArraySum = in.slice(start, end).map(_.label).sum
211+
val subArrayLength = math.abs(start - end)
212+
213+
for(i <- start to end - 1) {
214+
in(i) = LabeledPoint(subArraySum / subArrayLength, in(i).features)
215+
}
216+
}*/
217+
218+
219+
220+
/*
221+
OPTION 2
222+
def pool(in: Array[LabeledPoint], range: Range): Unit = {
223+
val subArray = in.slice(range.start, range.end + 1)
224+
225+
val subArraySum = subArray.map(_.label).sum
226+
val subArrayLength = subArray.length
227+
228+
for(i <- range.start to range.end) {
229+
in(i) = LabeledPoint(subArraySum / subArrayLength, in(i).features)
230+
}
231+
}
232+
233+
def poolExtendedViolators(in: Array[LabeledPoint], range: Range, monotonicityConstraint: MonotonicityConstraint): Unit = {
234+
var extendedRange = Range(range.start, range.end)
235+
236+
while (extendedRange.start >= 0 && !monotonicityConstraint.holds(in(extendedRange.start), in(extendedRange.start + 1))) {
237+
pool(in, Range(extendedRange.start, extendedRange.end))
238+
extendedRange = Range(extendedRange.start - 1, extendedRange.end)
239+
}
240+
}
241+
242+
def findViolatingSequence(in: Array[LabeledPoint], start: Int, monotonicityConstraint: MonotonicityConstraint): Option[Range] = {
243+
var j = start
244+
245+
while(j < in.length - 1 && !monotonicityConstraint.holds(in(start), in(j + 1))) {
246+
j = j + 1
247+
}
248+
249+
if(j == start) {
250+
None
251+
} else {
252+
Some(Range(start, j))
253+
}
254+
}
255+
256+
var i = 0;
257+
258+
while(i < in.length) {
259+
findViolatingSequence(in, i, monotonicityConstraint).fold[Unit]({
260+
i = i + 1
261+
})(r => {
262+
poolExtendedViolators(in, r, monotonicityConstraint)
263+
i = r.end
264+
})
265+
}
266+
267+
in
268+
*/
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.regression
19+
20+
import org.apache.spark.mllib.linalg.Vectors
21+
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
22+
import org.scalatest.{Matchers, FunSuite}
23+
24+
import scala.util.Random
25+
26+
class IsotonicRegressionSuite
27+
extends FunSuite
28+
with MLlibTestSparkContext
29+
with Matchers {
30+
31+
def generateDataPoints(labels: Double*): Seq[LabeledPoint] =
32+
labels.zip((1 to labels.size)).map(point => LabeledPoint(point._1, Vectors.dense(point._2)))
33+
34+
35+
test("increasing isotonic regression") {
36+
val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()
37+
38+
val alg = new PoolAdjacentViolators
39+
val model = alg.run(testRDD, Isotonic)
40+
41+
model.predictions should be(generateDataPoints(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
42+
}
43+
44+
test("isotonic regression with size 0") {
45+
val testRDD = sc.parallelize(List[LabeledPoint]()).cache()
46+
47+
val alg = new PoolAdjacentViolators
48+
val model = alg.run(testRDD, Isotonic)
49+
50+
model.predictions should be(List())
51+
}
52+
53+
test("isotonic regression with size 1") {
54+
val testRDD = sc.parallelize(generateDataPoints(1)).cache()
55+
56+
val alg = new PoolAdjacentViolators
57+
val model = alg.run(testRDD, Isotonic)
58+
59+
model.predictions should be(generateDataPoints(1))
60+
}
61+
62+
test("isotonic regression strictly increasing sequence") {
63+
val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 4, 5)).cache()
64+
65+
val alg = new PoolAdjacentViolators
66+
val model = alg.run(testRDD, Isotonic)
67+
68+
model.predictions should be(generateDataPoints(1, 2, 3, 4, 5))
69+
}
70+
71+
test("isotonic regression strictly decreasing sequence") {
72+
val testRDD = sc.parallelize(generateDataPoints(5, 4, 3, 2, 1)).cache()
73+
74+
val alg = new PoolAdjacentViolators
75+
val model = alg.run(testRDD, Isotonic)
76+
77+
model.predictions should be(generateDataPoints(3, 3, 3, 3, 3))
78+
}
79+
80+
test("isotonic regression prediction") {
81+
val testRDD = sc.parallelize(generateDataPoints(1, 2, 7, 1, 2)).cache()
82+
83+
val alg = new PoolAdjacentViolators
84+
val model = alg.run(testRDD, Isotonic)
85+
86+
model.predict(Vectors.dense(0)) should be(1)
87+
model.predict(Vectors.dense(2)) should be(2)
88+
model.predict(Vectors.dense(3)) should be(10d/3)
89+
model.predict(Vectors.dense(10)) should be(10d/3)
90+
}
91+
92+
test("antitonic regression prediction") {
93+
val testRDD = sc.parallelize(generateDataPoints(7, 5, 3, 5, 1)).cache()
94+
95+
val alg = new PoolAdjacentViolators
96+
val model = alg.run(testRDD, Antitonic)
97+
98+
model.predict(Vectors.dense(0)) should be(7)
99+
model.predict(Vectors.dense(2)) should be(5)
100+
model.predict(Vectors.dense(3)) should be(4)
101+
model.predict(Vectors.dense(10)) should be(1)
102+
}
103+
}
104+
105+
class IsotonicRegressionClusterSuite extends FunSuite with LocalClusterSparkContext {
106+
107+
test("task size should be small in both training and prediction") {
108+
val m = 4
109+
val n = 200000
110+
val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
111+
val random = new Random(idx)
112+
iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble()))))
113+
}.cache()
114+
115+
// If we serialize data directly in the task closure, the size of the serialized task would be
116+
// greater than 1MB and hence Spark would throw an error.
117+
val model = IsotonicRegression.train(points, Isotonic)
118+
val predictions = model.predict(points.map(_.features))
119+
}
120+
}

0 commit comments

Comments
 (0)