Skip to content

Commit 1fff77d

Browse files
SPARK-3278 changes after PR comments apache#3519. Java api changes, test refactoring, comments and citations, isotonic regression model validations, linear interpolation for predictions
1 parent 12151e6 commit 1fff77d

File tree

5 files changed

+239
-340
lines changed

5 files changed

+239
-340
lines changed

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 125 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -24,88 +24,142 @@ import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
2424
import org.apache.spark.rdd.RDD
2525

2626
/**
27-
* Regression model for Isotonic regression
27+
* Regression model for isotonic regression.
2828
*
29-
* @param features Array of features.
30-
* @param labels Array of labels associated to the features at the same index.
29+
* @param boundaries Array of boundaries for which predictions are known.
30+
* @param predictions Array of predictions associated to the boundaries at the same index.
3131
*/
3232
class IsotonicRegressionModel (
33-
features: Array[Double],
34-
val labels: Array[Double])
33+
boundaries: Array[Double],
34+
val predictions: Array[Double])
3535
extends Serializable {
3636

37+
private def isSorted(xs: Array[Double]): Boolean = {
38+
var i = 1
39+
while (i < xs.length) {
40+
if (xs(i) < xs(i - 1)) false
41+
i += 1
42+
}
43+
true
44+
}
45+
46+
assert(isSorted(boundaries))
47+
assert(boundaries.length == predictions.length)
48+
3749
/**
38-
* Predict labels for provided features
39-
* Using a piecewise constant function
50+
* Predict labels for provided features.
51+
* Using a piecewise linear function.
4052
*
41-
* @param testData features to be labeled
42-
* @return predicted labels
53+
* @param testData Features to be labeled.
54+
* @return Predicted labels.
4355
*/
4456
def predict(testData: RDD[Double]): RDD[Double] =
4557
testData.map(predict)
4658

4759
/**
48-
* Predict labels for provided features
49-
* Using a piecewise constant function
60+
* Predict labels for provided features.
61+
* Using a piecewise linear function.
5062
*
51-
* @param testData features to be labeled
52-
* @return predicted labels
63+
* @param testData Features to be labeled.
64+
* @return Predicted labels.
5365
*/
54-
def predict(testData: JavaRDD[java.lang.Double]): JavaDoubleRDD =
66+
def predict(testData: JavaDoubleRDD): JavaDoubleRDD =
5567
JavaDoubleRDD.fromRDD(predict(testData.rdd.asInstanceOf[RDD[Double]]))
5668

5769
/**
58-
* Predict a single label
59-
* Using a piecewise constant function
70+
* Predict a single label.
71+
* Using a piecewise linear function.
6072
*
61-
* @param testData feature to be labeled
62-
* @return predicted label
73+
* @param testData Feature to be labeled.
74+
* @return Predicted label.
75+
* If testData exactly matches a boundary then associated prediction is directly returned
76+
* If testData is lower or higher than all boundaries
77+
* then first or last prediction is returned respectively
78+
* If testData falls between two values in boundary then predictions is treated as piecewise
79+
* linear function and interpolated value is returned
6380
*/
6481
def predict(testData: Double): Double = {
65-
val result = binarySearch(features, testData)
6682

67-
val index =
68-
if (result == -1) {
69-
0
70-
} else if (result < 0) {
71-
-result - 2
72-
} else {
73-
result
74-
}
83+
def linearInterpolation(x1: Double, y1: Double, x2: Double, y2: Double, x: Double): Double = {
84+
y1 + (y2 - y1) * (x - x1) / (x2 - x1)
85+
}
7586

76-
labels(index)
87+
val insertIndex = binarySearch(boundaries, testData)
88+
89+
val normalisedInsertIndex = -insertIndex - 1
90+
91+
//Find if the index was lower than all values,
92+
//higher than all values, inbetween two values or exact match.
93+
if (insertIndex == -1) {
94+
predictions.head
95+
} else if (normalisedInsertIndex == boundaries.length){
96+
predictions.last
97+
} else if (insertIndex < 0) {
98+
linearInterpolation(
99+
boundaries(normalisedInsertIndex - 1),
100+
predictions(normalisedInsertIndex - 1),
101+
boundaries(normalisedInsertIndex),
102+
predictions(normalisedInsertIndex),
103+
testData)
104+
} else {
105+
predictions(insertIndex)
106+
}
77107
}
78108
}
79109

80110
/**
81-
* Isotonic regression
82-
* Currently implemented using parallel pool adjacent violators algorithm
111+
* Isotonic regression.
112+
* Currently implemented using parallelized pool adjacent violators algorithm.
113+
* Currently only univariate (single feature) algorithm supported.
114+
*
115+
* Sequential PAV implementation based on:
116+
* Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
117+
* "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
118+
*
119+
* Sequential PAV parallelized as per:
120+
* Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
121+
* "An approach to parallelizing isotonic regression."
122+
* Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
83123
*/
84-
class IsotonicRegression
85-
extends Serializable {
124+
class IsotonicRegression extends Serializable {
86125

87126
/**
88-
* Run algorithm to obtain isotonic regression model
127+
* Run pool adjacent violators algorithm to obtain isotonic regression model.
128+
*
129+
* @param input RDD of tuples (label, feature, weight) where label is dependent variable
130+
* for which we calculate isotonic regression, feature is independent variable
131+
* and weight represents number of measures with default 1.
89132
*
90-
* @param input (label, feature, weight)
91-
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
92-
* @return isotonic regression model
133+
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
134+
* @return Isotonic regression model.
93135
*/
94136
def run(
95137
input: RDD[(Double, Double, Double)],
96-
isotonic: Boolean = true): IsotonicRegressionModel = {
97-
createModel(
98-
parallelPoolAdjacentViolators(input, isotonic),
99-
isotonic)
100-
}
138+
isotonic: Boolean): IsotonicRegressionModel =
139+
createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic)
140+
141+
/**
142+
* Run pool adjacent violators algorithm to obtain isotonic regression model.
143+
*
144+
* @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
145+
* for which we calculate isotonic regression, feature is independent variable
146+
* and weight represents number of measures with default 1.
147+
*
148+
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
149+
* @return Isotonic regression model.
150+
*/
151+
def run(
152+
input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)],
153+
isotonic: Boolean): IsotonicRegressionModel =
154+
run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]], isotonic)
101155

102156
/**
103-
* Creates isotonic regression model with given parameters
157+
* Creates isotonic regression model with given parameters.
104158
*
105-
* @param predictions labels estimated using isotonic regression algorithm.
159+
* @param predictions Predictions calculated using pool adjacent violators algorithm.
106160
* Used for predictions on new data points.
107-
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
108-
* @return isotonic regression model
161+
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
162+
* @return Isotonic regression model.
109163
*/
110164
protected def createModel(
111165
predictions: Array[(Double, Double, Double)],
@@ -118,30 +172,30 @@ class IsotonicRegression
118172
}
119173

120174
/**
121-
* Performs a pool adjacent violators algorithm (PAVA)
175+
* Performs a pool adjacent violators algorithm (PAV).
122176
* Uses approach with single processing of data where violators
123177
* in previously processed data created by pooling are fixed immediatelly.
124-
* Uses optimization of discovering monotonicity violating sequences (blocks)
125-
* Method in situ mutates input array
178+
* Uses optimization of discovering monotonicity violating sequences (blocks).
126179
*
127-
* @param in input data
128-
* @param isotonic asc or desc
129-
* @return result
180+
* @param input Input data of tuples (label, feature, weight).
181+
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
182+
* @return Result tuples (label, feature, weight) where labels were updated
183+
* to form a monotone sequence as per isotonic regression definition.
130184
*/
131185
private def poolAdjacentViolators(
132-
in: Array[(Double, Double, Double)],
186+
input: Array[(Double, Double, Double)],
133187
isotonic: Boolean): Array[(Double, Double, Double)] = {
134188

135-
// Pools sub array within given bounds assigning weighted average value to all elements
136-
def pool(in: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
137-
val poolSubArray = in.slice(start, end + 1)
189+
// Pools sub array within given bounds assigning weighted average value to all elements.
190+
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
191+
val poolSubArray = input.slice(start, end + 1)
138192

139193
val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
140194
val weight = poolSubArray.map(_._3).sum
141195

142196
var i = start
143197
while (i <= end) {
144-
in(i) = (weightedSum / weight, in(i)._2, in(i)._3)
198+
input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
145199
i = i + 1
146200
}
147201
}
@@ -150,39 +204,40 @@ class IsotonicRegression
150204
(x, y) => if (isotonic) x <= y else x >= y
151205

152206
var i = 0
153-
while (i < in.length) {
207+
while (i < input.length) {
154208
var j = i
155209

156-
// Find monotonicity violating sequence, if any
157-
while (j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) {
210+
// Find monotonicity violating sequence, if any.
211+
while (j < input.length - 1 && !monotonicityConstraintHolds(input(j)._1, input(j + 1)._1)) {
158212
j = j + 1
159213
}
160214

161-
// If monotonicity was not violated, move to next data point
215+
// If monotonicity was not violated, move to next data point.
162216
if (i == j) {
163217
i = i + 1
164218
} else {
165219
// Otherwise pool the violating sequence
166-
// And check if pooling caused monotonicity violation in previously processed points
167-
while (i >= 0 && !monotonicityConstraintHolds(in(i)._1, in(i + 1)._1)) {
168-
pool(in, i, j)
220+
// and check if pooling caused monotonicity violation in previously processed points.
221+
while (i >= 0 && !monotonicityConstraintHolds(input(i)._1, input(i + 1)._1)) {
222+
pool(input, i, j)
169223
i = i - 1
170224
}
171225

172226
i = j
173227
}
174228
}
175229

176-
in
230+
input
177231
}
178232

179233
/**
180-
* Performs parallel pool adjacent violators algorithm
181-
* Calls Pool adjacent violators on each partition and then again on the result
234+
* Performs parallel pool adjacent violators algorithm.
235+
* Performs Pool adjacent violators algorithm on each partition and then again on the result.
182236
*
183-
* @param testData input
184-
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
185-
* @return result
237+
* @param testData Input data of tuples (label, feature, weight).
238+
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
239+
* @return Result tuples (label, feature, weight) where labels were updated
240+
* to form a monotone sequence as per isotonic regression definition.
186241
*/
187242
private def parallelPoolAdjacentViolators(
188243
testData: RDD[(Double, Double, Double)],
@@ -194,45 +249,4 @@ class IsotonicRegression
194249

195250
poolAdjacentViolators(parallelStepResult.collect(), isotonic)
196251
}
197-
}
198-
199-
/**
200-
* Top-level methods for monotone regression (either isotonic or antitonic).
201-
*/
202-
object IsotonicRegression {
203-
204-
/**
205-
* Train a monotone regression model given an RDD of (label, feature, weight).
206-
* Label is the dependent y value
207-
* Weight of the data point is the number of measurements. Default is 1
208-
*
209-
* @param input RDD of (label, feature, weight).
210-
* Each point describes a row of the data
211-
* matrix A as well as the corresponding right hand side label y
212-
* and weight as number of measurements
213-
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
214-
*/
215-
def train(
216-
input: RDD[(Double, Double, Double)],
217-
isotonic: Boolean = true): IsotonicRegressionModel = {
218-
new IsotonicRegression().run(input, isotonic)
219-
}
220-
221-
/**
222-
* Train a monotone regression model given an RDD of (label, feature, weight).
223-
* Label is the dependent y value
224-
* Weight of the data point is the number of measurements. Default is 1
225-
*
226-
* @param input RDD of (label, feature, weight).
227-
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
228-
* @return
229-
*/
230-
def train(
231-
input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)],
232-
isotonic: Boolean): IsotonicRegressionModel = {
233-
new IsotonicRegression()
234-
.run(
235-
input.rdd.asInstanceOf[RDD[(Double, Double, Double)]],
236-
isotonic)
237-
}
238-
}
252+
}

0 commit comments

Comments
 (0)