Skip to content

[SPARK-20449][ML] Upgrade breeze version to 0.13.1 #17746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,4 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(MIT License) RowsGroup (http://datatables.net/license/mit)
(MIT License) jsonFormatter (http://www.jqueryscript.net/other/jQuery-Plugin-For-Pretty-JSON-Formatting-jsonFormatter.html)
(MIT License) modernizr (https://github.com/Modernizr/Modernizr/blob/master/LICENSE)
(MIT License) machinist (https://github.com/typelevel/machinist)
10 changes: 5 additions & 5 deletions R/pkg/inst/tests/testthat/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -288,18 +288,18 @@ test_that("spark.mlp", {
c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
c("1.0", "1.0", "2.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))

model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0))
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
c("1.0", "1.0", "2.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))

model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2)
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "2.0", "1.0", "0.0"))
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "0.0", "1.0", "0.0"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the prediction results changed in R and Python test cases? The bugs being fixed are in LBFGS-B. In theory, the current optimizer only uses LBFGS, so the results should be the same.

Copy link
Contributor Author

@yanboliang yanboliang Apr 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it’s weird. After in-depth investigation, I found all the failed tests are running on very tiny dataset with very tiny maxIter value, which means they are not converged. I suspect that some underlying breeze changes caused these failures, but I think it doesn’t matter, since all tests against large dataset are successful.
I don’t think checking intermediate result during iteration is make sense, and these intermediate result may vulnerable and not stable. We should only check the last converged result. So I sent #17757 to update relevant tests to make them invulnerable.
I also run PySpark LogisticRegression on a larger dataset against Spark depends on breeze 0.12 and 0.13.1, they got the same result with reasonable tolerance:
For breeze 0.12:

>>> df = spark.read.format("libsvm").load("/Users/yliang/data/trunk4/spark/data/mllib/sample_multiclass_classification_data.txt")
>>> from pyspark.ml.classification import LogisticRegression
>>> mlor = LogisticRegression(maxIter=100, regParam=0.01, family="multinomial")
>>> mlorModel = mlor.fit(df)
>>> mlorModel.coefficientMatrix
DenseMatrix(3, 4, [1.0584, -1.8365, 3.2426, 3.6224, -2.1275, 2.8712, -2.8362, -2.5096, 1.069, -1.0347, -0.4064, -1.1128], 1)
>>> mlorModel.interceptVector
DenseVector([-1.1036, -0.5917, 1.6953])

For breeze 0.13.1:

>>> df = spark.read.format("libsvm").load("/Users/yliang/data/trunk4/spark/data/mllib/sample_multiclass_classification_data.txt")
>>> from pyspark.ml.classification import LogisticRegression
>>> mlor = LogisticRegression(maxIter=100, regParam=0.01, family="multinomial")
>>> mlorModel = mlor.fit(df)
>>> mlorModel.coefficientMatrix
DenseMatrix(3, 4, [1.0584, -1.8365, 3.2426, 3.6224, -2.1274, 2.8712, -2.8363, -2.5096, 1.069, -1.0347, -0.4064, -1.1128], 1)
>>> mlorModel.interceptVector
DenseVector([-1.1036, -0.5917, 1.6953])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With small datasets, it can result an ill-conditioned problem, which is not ideal for unit-tests since they can be very unstable.

I wonder with real bigger datasets we use in scala unittests, with breeze 0.13.1, whether the number of iterations required for convergency are the same compared with breeze 0.13?


# Test formula works well
df <- suppressWarnings(createDataFrame(iris))
Expand All @@ -310,8 +310,8 @@ test_that("spark.mlp", {
expect_equal(summary$numOfOutputs, 3)
expect_equal(summary$layers, c(4, 3))
expect_equal(length(summary$weights), 15)
expect_equal(head(summary$weights, 5), list(-1.1957257, -5.2693685, 7.4489734, -6.3751413,
-10.2376130), tolerance = 1e-6)
expect_equal(head(summary$weights, 5), list(-0.5793153, -4.652961, 6.216155, -6.649478,
-10.51147), tolerance = 1e-3)
})

test_that("spark.naiveBayes", {
Expand Down
12 changes: 7 additions & 5 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ avro-mapred-1.7.7-hadoop2.jar
base64-2.3.8.jar
bcprov-jdk15on-1.51.jar
bonecp-0.8.0.RELEASE.jar
breeze-macros_2.11-0.12.jar
breeze_2.11-0.12.jar
breeze-macros_2.11-0.13.1.jar
breeze_2.11-0.13.1.jar
calcite-avatica-1.2.0-incubating.jar
calcite-core-1.2.0-incubating.jar
calcite-linq4j-1.2.0-incubating.jar
Expand Down Expand Up @@ -129,6 +129,8 @@ libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
mail-1.4.7.jar
mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
Expand Down Expand Up @@ -162,13 +164,13 @@ scala-parser-combinators_2.11-1.0.4.jar
scala-reflect-2.11.8.jar
scala-xml_2.11-1.0.2.jar
scalap-2.11.8.jar
shapeless_2.11-2.0.0.jar
shapeless_2.11-2.3.2.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
snappy-0.2.jar
snappy-java-1.1.2.6.jar
spire-macros_2.11-0.7.4.jar
spire_2.11-0.7.4.jar
spire-macros_2.11-0.13.0.jar
spire_2.11-0.13.0.jar
stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
Expand Down
12 changes: 7 additions & 5 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ avro-mapred-1.7.7-hadoop2.jar
base64-2.3.8.jar
bcprov-jdk15on-1.51.jar
bonecp-0.8.0.RELEASE.jar
breeze-macros_2.11-0.12.jar
breeze_2.11-0.12.jar
breeze-macros_2.11-0.13.1.jar
breeze_2.11-0.13.1.jar
calcite-avatica-1.2.0-incubating.jar
calcite-core-1.2.0-incubating.jar
calcite-linq4j-1.2.0-incubating.jar
Expand Down Expand Up @@ -130,6 +130,8 @@ libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
mail-1.4.7.jar
mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
Expand Down Expand Up @@ -163,13 +165,13 @@ scala-parser-combinators_2.11-1.0.4.jar
scala-reflect-2.11.8.jar
scala-xml_2.11-1.0.2.jar
scalap-2.11.8.jar
shapeless_2.11-2.0.0.jar
shapeless_2.11-2.3.2.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
snappy-0.2.jar
snappy-java-1.1.2.6.jar
spire-macros_2.11-0.7.4.jar
spire_2.11-0.7.4.jar
spire-macros_2.11-0.13.0.jar
spire_2.11-0.13.0.jar
stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,10 +894,10 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine

private[regression] object Probit extends Link("probit") {

override def link(mu: Double): Double = dist.Gaussian(0.0, 1.0).icdf(mu)
override def link(mu: Double): Double = dist.Gaussian(0.0, 1.0).inverseCdf(mu)

override def deriv(mu: Double): Double = {
1.0 / dist.Gaussian(0.0, 1.0).pdf(dist.Gaussian(0.0, 1.0).icdf(mu))
1.0 / dist.Gaussian(0.0, 1.0).pdf(dist.Gaussian(0.0, 1.0).inverseCdf(mu))
}

override def unlink(eta: Double): Double = dist.Gaussian(0.0, 1.0).cdf(eta)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,20 +788,14 @@ class DistributedLDAModel private[clustering] (
@Since("1.5.0")
def topTopicsPerDocument(k: Int): RDD[(Long, Array[Int], Array[Double])] = {
graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, topicCounts) =>
// TODO: Remove work-around for the breeze bug.
// https://github.com/scalanlp/breeze/issues/561
val topIndices = if (k == topicCounts.length) {
Seq.range(0, k)
} else {
argtopk(topicCounts, k)
}
val topIndices = argtopk(topicCounts, k)
val sumCounts = sum(topicCounts)
val weights = if (sumCounts != 0) {
topicCounts(topIndices) / sumCounts
topicCounts(topIndices).toArray.map(_ / sumCounts)
} else {
topicCounts(topIndices)
topicCounts(topIndices).toArray
}
(docID.toLong, topIndices.toArray, weights.toArray)
(docID.toLong, topIndices.toArray, weights)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ class LBFGSSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers
// With smaller convergenceTol, it takes more steps.
assert(lossLBFGS3.length > lossLBFGS2.length)

// Based on observation, lossLBFGS2 runs 5 iterations, no theoretically guaranteed.
assert(lossLBFGS3.length == 6)
// Based on observation, lossLBFGS3 runs 7 iterations, no theoretically guaranteed.
assert(lossLBFGS3.length == 7)
assert((lossLBFGS3(4) - lossLBFGS3(5)) / lossLBFGS3(4) < convergenceTol)
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.binary.version}</artifactId>
<version>0.12</version>
<version>0.13.1</version>
<exclusions>
<!-- This is included as a compile-scoped dependency by jtransforms, which is
a dependency of breeze. -->
Expand Down
18 changes: 8 additions & 10 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,30 +190,28 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti
>>> blor = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight")
>>> blorModel = blor.fit(bdf)
>>> blorModel.coefficients
DenseVector([5.5...])
DenseVector([5.4...])
>>> blorModel.intercept
-2.68...
-2.63...
>>> mdf = sc.parallelize([
... Row(label=1.0, weight=2.0, features=Vectors.dense(1.0)),
... Row(label=0.0, weight=2.0, features=Vectors.sparse(1, [], [])),
... Row(label=2.0, weight=2.0, features=Vectors.dense(3.0))]).toDF()
>>> mlor = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight",
... family="multinomial")
>>> mlorModel = mlor.fit(mdf)
>>> print(mlorModel.coefficientMatrix)
DenseMatrix([[-2.3...],
[ 0.2...],
[ 2.1... ]])
>>> mlorModel.coefficientMatrix
DenseMatrix(3, 1, [-2.3..., 0.2..., 2.1...], 1)
>>> mlorModel.interceptVector
DenseVector([2.0..., 0.8..., -2.8...])
DenseVector([2.1..., 0.6..., -2.8...])
>>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
>>> result = blorModel.transform(test0).head()
>>> result.prediction
0.0
>>> result.probability
DenseVector([0.99..., 0.00...])
>>> result.rawPrediction
DenseVector([8.22..., -8.22...])
DenseVector([8.12..., -8.12...])
>>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF()
>>> blorModel.transform(test1).head().prediction
1.0
Expand Down Expand Up @@ -1490,9 +1488,9 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable):
>>> ovr = OneVsRest(classifier=lr)
>>> model = ovr.fit(df)
>>> [x.coefficients for x in model.models]
[DenseVector([3.3925, 1.8785]), DenseVector([-4.3016, -6.3163]), DenseVector([-4.5855, 6.1785])]
[DenseVector([4.9791, 2.426]), DenseVector([-4.1198, -5.9326]), DenseVector([-3.314, 5.2423])]
>>> [x.intercept for x in model.models]
[-3.64747..., 2.55078..., -1.10165...]
[-5.06544..., 2.30341..., -1.29133...]
>>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF()
>>> model.transform(test0).head().prediction
1.0
Expand Down