Skip to content

Commit 0b35c15

Browse files
committed
compress pools and update tests
1 parent 35d044e commit 0b35c15

File tree

2 files changed

+102
-33
lines changed

2 files changed

+102
-33
lines changed

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.io.Serializable
2121
import java.lang.{Double => JDouble}
2222
import java.util.Arrays.binarySearch
2323

24+
import scala.collection.mutable.ArrayBuffer
25+
2426
import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
2527
import org.apache.spark.rdd.RDD
2628

@@ -208,9 +210,13 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
208210
private def poolAdjacentViolators(
209211
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
210212

213+
if (input.isEmpty) {
214+
return Array.empty
215+
}
216+
211217
// Pools sub array within given bounds assigning weighted average value to all elements.
212218
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
213-
val poolSubArray = input.view.slice(start, end + 1)
219+
val poolSubArray = input.slice(start, end + 1)
214220

215221
val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
216222
val weight = poolSubArray.map(_._3).sum
@@ -246,7 +252,35 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
246252
}
247253
}
248254

249-
input
255+
// For points having the same prediction, we only keep two boundary points.
256+
val compressed = ArrayBuffer.empty[(Double, Double, Double)]
257+
258+
var (curLabel, curFeature, curWeight) = input.head
259+
var rightBound = curFeature
260+
def merge(): Unit = {
261+
compressed += ((curLabel, curFeature, curWeight))
262+
if (rightBound > curFeature) {
263+
compressed += ((curLabel, rightBound, 0.0))
264+
}
265+
}
266+
i = 1
267+
while (i < input.length) {
268+
val (label, feature, weight) = input(i)
269+
if (label == curLabel) {
270+
curWeight += weight
271+
rightBound = feature
272+
} else {
273+
merge()
274+
curLabel = label
275+
curFeature = feature
276+
curWeight = weight
277+
rightBound = curFeature
278+
}
279+
i += 1
280+
}
281+
merge()
282+
283+
compressed.toArray
250284
}
251285

252286
/**

mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala

Lines changed: 66 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,13 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
2929
}
3030

3131
private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = {
32-
labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d))
32+
Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, 1d))
3333
}
3434

3535
private def generateIsotonicInput(
3636
labels: Seq[Double],
3737
weights: Seq[Double]): Seq[(Double, Double, Double)] = {
38-
labels.zip(1 to labels.size)
39-
.zip(weights)
40-
.map(point => (point._1._1, point._1._2.toDouble, point._2))
38+
Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, weights(i)))
4139
}
4240

4341
private def runIsotonicRegression(
@@ -55,87 +53,123 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
5553
}
5654

5755
test("increasing isotonic regression") {
58-
val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true)
59-
assert(model.predictions === Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 16.5, 16.5, 17, 18))
56+
/*
57+
The following result could be re-produced with sklearn.
58+
59+
> from sklearn.isotonic import IsotonicRegression
60+
> x = range(9)
61+
> y = [1, 2, 3, 1, 6, 17, 16, 17, 18]
62+
> ir = IsotonicRegression(x, y)
63+
> print ir.predict(x)
64+
65+
array([ 1. , 2. , 2. , 2. , 6. , 16.5, 16.5, 17. , 18. ])
66+
*/
67+
val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true)
68+
69+
assert(Array.tabulate(9)(x => model.predict(x)) === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18))
70+
71+
assert(model.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8))
72+
assert(model.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0))
73+
assert(model.isotonic)
6074
}
6175

6276
test("isotonic regression with size 0") {
6377
val model = runIsotonicRegression(Seq(), true)
78+
6479
assert(model.predictions === Array())
6580
}
6681

6782
test("isotonic regression with size 1") {
6883
val model = runIsotonicRegression(Seq(1), true)
84+
6985
assert(model.predictions === Array(1.0))
7086
}
7187

7288
test("isotonic regression strictly increasing sequence") {
7389
val model = runIsotonicRegression(Seq(1, 2, 3, 4, 5), true)
90+
7491
assert(model.predictions === Array(1, 2, 3, 4, 5))
7592
}
7693

7794
test("isotonic regression strictly decreasing sequence") {
7895
val model = runIsotonicRegression(Seq(5, 4, 3, 2, 1), true)
79-
assert(model.predictions === Array(3, 3, 3, 3, 3))
96+
97+
assert(model.boundaries === Array(0, 4))
98+
assert(model.predictions === Array(3, 3))
8099
}
81100

82101
test("isotonic regression with last element violating monotonicity") {
83102
val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), true)
84-
assert(model.predictions === Array(1, 2, 3, 3, 3))
103+
104+
assert(model.boundaries === Array(0, 1, 2, 4))
105+
assert(model.predictions === Array(1, 2, 3, 3))
85106
}
86107

87108
test("isotonic regression with first element violating monotonicity") {
88109
val model = runIsotonicRegression(Seq(4, 2, 3, 4, 5), true)
89-
assert(model.predictions === Array(3, 3, 3, 4, 5))
110+
111+
assert(model.boundaries === Array(0, 2, 3, 4))
112+
assert(model.predictions === Array(3, 3, 4, 5))
90113
}
91114

92115
test("isotonic regression with negative labels") {
93116
val model = runIsotonicRegression(Seq(-1, -2, 0, 1, -1), true)
94-
assert(model.predictions === Array(-1.5, -1.5, 0, 0, 0))
117+
118+
assert(model.boundaries === Array(0, 1, 2, 4))
119+
assert(model.predictions === Array(-1.5, -1.5, 0, 0))
95120
}
96121

97122
test("isotonic regression with unordered input") {
98-
val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache()
123+
val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2)
124+
99125
val model = new IsotonicRegression().run(trainRDD)
100126
assert(model.predictions === Array(1, 2, 3, 4, 5))
101127
}
102128

103129
test("weighted isotonic regression") {
104130
val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2), true)
105-
assert(model.predictions === Array(1, 2, 2.75, 2.75,2.75))
131+
132+
assert(model.boundaries === Array(0, 1, 2, 4))
133+
assert(model.predictions === Array(1, 2, 2.75, 2.75))
106134
}
107135

108136
test("weighted isotonic regression with weights lower than 1") {
109137
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true)
110-
assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2))
138+
139+
assert(model.boundaries === Array(0, 1, 2, 4))
140+
assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2))
111141
}
112142

113143
test("weighted isotonic regression with negative weights") {
114144
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
115-
assert(model.predictions === Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6))
145+
146+
assert(model.boundaries === Array(0.0, 1.0, 4.0))
147+
assert(model.predictions === Array(1.0, 10.0/6, 10.0/6))
116148
}
117149

118150
test("weighted isotonic regression with zero weights") {
119151
val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true)
120-
assert(model.predictions === Array(1, 2, 2, 2, 2))
152+
153+
assert(model.boundaries === Array(0.0, 1.0, 4.0))
154+
assert(model.predictions === Array(1, 2, 2))
121155
}
122156

123157
test("isotonic regression prediction") {
124158
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
125159

160+
assert(model.predict(-2) === 1)
126161
assert(model.predict(-1) === 1)
127-
assert(model.predict(0) === 1)
128-
assert(model.predict(1.5) === 1.5)
129-
assert(model.predict(1.75) === 1.75)
130-
assert(model.predict(2) === 2)
131-
assert(model.predict(3) === 10d/3)
132-
assert(model.predict(10) === 10d/3)
162+
assert(model.predict(0.5) === 1.5)
163+
assert(model.predict(0.75) === 1.75)
164+
assert(model.predict(1) === 2)
165+
assert(model.predict(2) === 10d/3)
166+
assert(model.predict(9) === 10d/3)
133167
}
134168

135169
test("isotonic regression prediction with duplicate features") {
136170
val trainRDD = sc.parallelize(
137171
Seq[(Double, Double, Double)](
138-
(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1))).cache()
172+
(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2)
139173
val model = new IsotonicRegression().run(trainRDD)
140174

141175
assert(model.predict(0) === 1)
@@ -147,7 +181,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
147181
test("antitonic regression prediction with duplicate features") {
148182
val trainRDD = sc.parallelize(
149183
Seq[(Double, Double, Double)](
150-
(5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1))).cache()
184+
(5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2)
151185
val model = new IsotonicRegression().setIsotonic(false).run(trainRDD)
152186

153187
assert(model.predict(0) === 6)
@@ -158,21 +192,22 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
158192

159193
test("isotonic regression RDD prediction") {
160194
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
161-
val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache()
162195

163-
assert(model.predict(testRDD).collect() === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3))
196+
val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2)
197+
val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2)
198+
assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3))
164199
}
165200

166201
test("antitonic regression prediction") {
167202
val model = runIsotonicRegression(Seq(7, 5, 3, 5, 1), false)
168203

204+
assert(model.predict(-2) === 7)
169205
assert(model.predict(-1) === 7)
170-
assert(model.predict(0) === 7)
171-
assert(model.predict(1.5) === 6)
172-
assert(model.predict(1.75) === 5.5)
173-
assert(model.predict(2) === 5)
174-
assert(model.predict(3) === 4)
175-
assert(model.predict(10) === 1)
206+
assert(model.predict(0.5) === 6)
207+
assert(model.predict(0.75) === 5.5)
208+
assert(model.predict(1) === 5)
209+
assert(model.predict(2) === 4)
210+
assert(model.predict(9) === 1)
176211
}
177212

178213
test("model construction") {

0 commit comments

Comments
 (0)