Skip to content

[SPARK-18060][ML] Avoid unnecessary computation for MLOR #15593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -438,18 +438,14 @@ class LogisticRegression @Since("1.2.0") (
val standardizationParam = $(standardization)
def regParamL1Fun = (index: Int) => {
// Remove the L1 penalization on the intercept
val isIntercept = $(fitIntercept) && ((index + 1) % numFeaturesPlusIntercept == 0)
val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets
if (isIntercept) {
0.0
} else {
if (standardizationParam) {
regParamL1
} else {
val featureIndex = if ($(fitIntercept)) {
index % numFeaturesPlusIntercept
} else {
index % numFeatures
}
val featureIndex = index / numCoefficientSets
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
Expand All @@ -466,6 +462,15 @@ class LogisticRegression @Since("1.2.0") (
new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
}

/*
The coefficients are laid out in column major order during training. e.g. for
`numClasses = 3` and `numFeatures = 2` and `fitIntercept = true` the layout is:

Array(beta_11, beta_21, beta_31, beta_12, beta_22, beta_32, intercept_1, intercept_2,
intercept_3)

where beta_jk corresponds to the coefficient for class `j` and feature `k`.
*/
val initialCoefficientsWithIntercept =
Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept)

Expand All @@ -489,13 +494,14 @@ class LogisticRegression @Since("1.2.0") (
val initialCoefWithInterceptArray = initialCoefficientsWithIntercept.toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you document on line 465 the layout of the data, to help future developers (all the coefficients, in column major order, maybe followed by a last column that contain the intercept)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment, not sure if it's exactly what you had in mind. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's harder to think about we are using column major in MLOR, and not consistent with the rest of linear models. Do you think we can still use row major as much as possible so we don't need to touch code here, and only do the column major thing in LogisticAggregator internally, and when gradient of LogisticAggregator is called, we convert it back from column major to row major?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other linear models that use a matrix of coefficients? I agree that thinking about indexing when flattening a matrix into an array is a pain, but I don't really see how column major is more difficult than row major. The only place this would make much difference is that we wouldn't have to modify the L2 reg update logic and the initial coefficients, but we still need to swap between layouts in every iteration. I guess I don't see that this is obviously simpler.

And for the review of this PR, I think we can be confident of the correctness due to the robustness of the LogisticRegression test suite - which has extensive correctness tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we are looping the coefficients array in LiR or BLOR in a flatten array, people think about it by looping through in the direction of features. In this PR, the logic is changed, and can lead to some maintenance issues when developers work on those code later. For example, I have difficulty to understand the code at the first glance. I think what we can do alternatively when we do L1/L2 update logic and initial coefficients things, we can put them into col major matrix, and use foreachActive with explicit col and row index so reader can easily understand the code logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, for LiR or binary LoR, row- or col- major will both loop "in the direction of features" since whether the coefficients are a "row vector" or a "column vector" doesn't matter, no?

val providedCoef = optInitialModel.get.coefficientMatrix
providedCoef.foreachActive { (row, col, value) =>
val flatIndex = row * numFeaturesPlusIntercept + col
// convert matrix to column major for training
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code path ever tested? Since the current initialModel is just a sort of dummy placeholder, always None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are unit tests for it. Also, this is used in the old mllib code, which exposed an initial model but wraps ML now.

val flatIndex = col * numCoefficientSets + row
// We need to scale the coefficients since they will be trained in the scaled space
initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col)
}
if ($(fitIntercept)) {
optInitialModel.get.interceptVector.foreachActive { (index, value) =>
val coefIndex = (index + 1) * numFeaturesPlusIntercept - 1
val coefIndex = numCoefficientSets * numFeatures + index
initialCoefWithInterceptArray(coefIndex) = value
}
}
Expand Down Expand Up @@ -526,7 +532,7 @@ class LogisticRegression @Since("1.2.0") (
val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing
val rawMean = rawIntercepts.sum / rawIntercepts.length
rawIntercepts.indices.foreach { i =>
initialCoefficientsWithIntercept.toArray(i * numFeaturesPlusIntercept + numFeatures) =
initialCoefficientsWithIntercept.toArray(numClasses * numFeatures + i) =
rawIntercepts(i) - rawMean
}
} else if ($(fitIntercept)) {
Expand Down Expand Up @@ -572,16 +578,20 @@ class LogisticRegression @Since("1.2.0") (
/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.

Additionally, since the coefficients were laid out in column major order during training
to avoid extra computation, we convert them back to row major before passing them to the
model.

Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val rawCoefficients = state.x.toArray.clone()
val coefficientArray = Array.tabulate(numCoefficientSets * numFeatures) { i =>
// flatIndex will loop though rawCoefficients, and skip the intercept terms.
val flatIndex = if ($(fitIntercept)) i + i / numFeatures else i
val colMajorIndex = (i % numFeatures) * numCoefficientSets + i / numFeatures
val featureIndex = i % numFeatures
if (featuresStd(featureIndex) != 0.0) {
rawCoefficients(flatIndex) / featuresStd(featureIndex)
rawCoefficients(colMajorIndex) / featuresStd(featureIndex)
} else {
0.0
}
Expand Down Expand Up @@ -618,7 +628,7 @@ class LogisticRegression @Since("1.2.0") (

val interceptsArray: Array[Double] = if ($(fitIntercept)) {
Array.tabulate(numCoefficientSets) { i =>
val coefIndex = (i + 1) * numFeaturesPlusIntercept - 1
val coefIndex = numFeatures * numCoefficientSets + i
rawCoefficients(coefIndex)
}
} else {
Expand Down Expand Up @@ -697,6 +707,7 @@ class LogisticRegressionModel private[spark] (
/**
* A vector of model coefficients for "binomial" logistic regression. If this model was trained
* using the "multinomial" family then an exception is thrown.
*
* @return Vector
*/
@Since("2.0.0")
Expand All @@ -720,6 +731,7 @@ class LogisticRegressionModel private[spark] (
/**
* The model intercept for "binomial" logistic regression. If this model was fit with the
* "multinomial" family then an exception is thrown.
*
* @return Double
*/
@Since("1.3.0")
Expand Down Expand Up @@ -1389,6 +1401,12 @@ class BinaryLogisticRegressionSummary private[classification] (
* $$
* </blockquote></p>
*
* @note In order to avoid unnecessary computation during calculation of the gradient updates
* we lay out the coefficients in column major order during training. This allows us to
* perform feature standardization once, while still retaining sequential memory access
* for speed. We convert back to row major order when we create the model,
* since this form is optimal for the matrix operations used for prediction.
*
* @param bcCoefficients The broadcast coefficients corresponding to the features.
* @param bcFeaturesStd The broadcast standard deviation values of the features.
* @param numClasses the number of possible outcomes for k classes classification problem in
Expand Down Expand Up @@ -1486,57 +1504,65 @@ private class LogisticAggregator(
var marginOfLabel = 0.0
var maxMargin = Double.NegativeInfinity

val margins = Array.tabulate(numClasses) { i =>
var margin = 0.0
features.foreachActive { (index, value) =>
if (localFeaturesStd(index) != 0.0 && value != 0.0) {
margin += localCoefficients(i * numFeaturesPlusIntercept + index) *
value / localFeaturesStd(index)
}
val margins = new Array[Double](numClasses)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be good to just add a general comment (perhaps in LogisticAggregator) about the training being done using col major order, and that this is converted to row major once training is done, etc? And perhaps a little detail on why it is (was) necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I added a comment inside of LogisticRegression.train above. Do you think it needs to be moved or just duplicated inside LogisticAggregator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a slightly more detailed comment would be good for the aggregator, something like the following (please feel free to make it more clear):

In order to avoid unnecessary computation during calculation of the gradient updates,
we lay out the coefficients in column major order during training. This allows us to 
perform feature standardization once, while still retaining sequential memory access 
for speed. We convert back to row-major order when we create the model, 
since this form is optimal for the matrix operations used for prediction.

Here we can amend slightly to say:

Additionally, since the coefficients were laid out in column major order during training to
avoid extra computation, we convert them back to row major before passing them to the model.

features.foreachActive { (index, value) =>
val stdValue = value / localFeaturesStd(index)
var j = 0
while (j < numClasses) {
margins(j) += localCoefficients(index * numClasses + j) * stdValue
j += 1
}

}
var i = 0
while (i < numClasses) {
if (fitIntercept) {
margin += localCoefficients(i * numFeaturesPlusIntercept + numFeatures)
margins(i) += localCoefficients(numClasses * numFeatures + i)
}
if (i == label.toInt) marginOfLabel = margin
if (margin > maxMargin) {
maxMargin = margin
if (i == label.toInt) marginOfLabel = margins(i)
if (margins(i) > maxMargin) {
maxMargin = margins(i)
}
margin
i += 1
}

/**
* When maxMargin > 0, the original formula could cause overflow.
* We address this by subtracting maxMargin from all the margins, so it's guaranteed
* that all of the new margins will be smaller than zero to prevent arithmetic overflow.
*/
val multipliers = new Array[Double](numClasses)
val sum = {
var temp = 0.0
if (maxMargin > 0) {
for (i <- 0 until numClasses) {
margins(i) -= maxMargin
temp += math.exp(margins(i))
}
} else {
for (i <- 0 until numClasses) {
temp += math.exp(margins(i))
}
var i = 0
while (i < numClasses) {
if (maxMargin > 0) margins(i) -= maxMargin
val exp = math.exp(margins(i))
temp += exp
multipliers(i) = exp
i += 1
}
temp
}

for (i <- 0 until numClasses) {
val multiplier = math.exp(margins(i)) / sum - {
if (label == i) 1.0 else 0.0
}
features.foreachActive { (index, value) =>
if (localFeaturesStd(index) != 0.0 && value != 0.0) {
localGradientArray(i * numFeaturesPlusIntercept + index) +=
weight * multiplier * value / localFeaturesStd(index)
margins.indices.foreach { i =>
multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0)
}
features.foreachActive { (index, value) =>
if (localFeaturesStd(index) != 0.0 && value != 0.0) {
val stdValue = value / localFeaturesStd(index)
var j = 0
while (j < numClasses) {
localGradientArray(index * numClasses + j) +=
weight * multipliers(j) * stdValue
j += 1
}
}
if (fitIntercept) {
localGradientArray(i * numFeaturesPlusIntercept + numFeatures) += weight * multiplier
}
if (fitIntercept) {
var i = 0
while (i < numClasses) {
localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i)
i += 1
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make def gradient: Vector returning Matrix, and for MLOR, the implementation can be col major matrix so when we use foreachActive we don't need to worry about the underlying implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I fully get where you intend to use foreachActive over the gradient matrix? Maybe it's the location of this comment that is confusing me ...

... but here in multinomialUpdateInPlace, we are iterating over features using foreachActive, then for each feature iterating over numClasses. If we iterate over the gradient using foreachActive how will that work? Won't it be super inefficient? Perhaps I am missing something about what you intend, could you clarify with an example?

Copy link
Contributor

@MLnick MLnick Nov 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My (perhaps incorrect) understanding of what you are saying:

"The aggregator should internally convert to col-major during training & in-place update and from gradient will return a Matrix that can be used during L2 update (cycle through using foreachActive) in L1678-L1716."

But to me that would require swapping coeffs.foreachActive for gradient.foreachActive in L1684. Since coeff is itself a vector (which we can't really change due to BreezeDiffFunction, and we laid it out col-major for training) we would need to index back into that using the same col-major indexing anyway, no? So I don't see that we gain anything for L2 reg. Also, if we change back and forth every iteration that seems like it would be inefficient.

For the L1 reg function, that is purely index-based so val isIntercept = $(fitIntercept) && ((index + 1) % numFeaturesPlusIntercept == 0) doesn't seem any more or less complex than val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets. So I don't think we gain anything during L1 reg function either.

Finally, during initial coefficients setup, we already loop through them if they are provided. If not, the intercept adjustments are the only things that occur (the rest are zeros) so that is just indexing into the intercepts, which whether row- or col-major doesn't seem to me to be a major issue.

I tend to agree that the col-major indexing is no "more complex" than row-major. It's also commented fairly clearly why and where we lay it out col-major (with example of the form), and where we convert back to row-major. We need to convert back to row-major outside of the aggregator because the coefficients are returned from optimizer.states.x which is Breeze-specific, not LogisticAggregator or even LogisticCostFun.

Expand Down Expand Up @@ -1637,6 +1663,7 @@ private class LogisticCostFun(
val bcCoeffs = instances.context.broadcast(coeffs)
val featuresStd = bcFeaturesStd.value
val numFeatures = featuresStd.length
val numCoefficientSets = if (multinomial) numClasses else 1

val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
Expand All @@ -1656,7 +1683,7 @@ private class LogisticCostFun(
var sum = 0.0
coeffs.foreachActive { case (index, value) =>
// We do not apply regularization to the intercepts
val isIntercept = fitIntercept && ((index + 1) % (numFeatures + 1) == 0)
val isIntercept = fitIntercept && index >= numCoefficientSets * numFeatures
if (!isIntercept) {
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
Expand All @@ -1665,11 +1692,7 @@ private class LogisticCostFun(
totalGradientArray(index) += regParamL2 * value
value * value
} else {
val featureIndex = if (fitIntercept) {
index % (numFeatures + 1)
} else {
index % numFeatures
}
val featureIndex = index / numCoefficientSets
if (featuresStd(featureIndex) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
Expand Down