Skip to content

[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M #20964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Row

class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class IDFSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

Expand Down Expand Up @@ -57,7 +55,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
Vectors.dense(0.0, 1.0, 2.0, 3.0),
Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
)
val numOfData = data.size
val numOfData = data.length
val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
math.log((numOfData + 1.0) / (x + 1.0))
})
Expand All @@ -72,7 +70,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead

MLTestingUtils.checkCopyAndUids(idfEst, idfModel)

idfModel.transform(df).select("idfValue", "expected").collect().foreach {
testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
case Row(x: Vector, y: Vector) =>
assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.")
}
Expand All @@ -85,7 +83,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
Vectors.dense(0.0, 1.0, 2.0, 3.0),
Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
)
val numOfData = data.size
val numOfData = data.length
val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0
})
Expand All @@ -99,7 +97,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
.setMinDocFreq(1)
.fit(df)

idfModel.transform(df).select("idfValue", "expected").collect().foreach {
testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
case Row(x: Vector, y: Vector) =>
assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*/
package org.apache.spark.ml.feature

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.SparkException
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.sql.{DataFrame, Row}

class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class ImputerSuite extends MLTest with DefaultReadWriteTest {

test("Imputer for Double with default missing Value NaN") {
val df = spark.createDataFrame( Seq(
Expand Down Expand Up @@ -76,6 +75,28 @@ class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with Default
ImputerSuite.iterateStrategyTest(imputer, df)
}

test("Imputer should work with Structured Streaming") {
val localSpark = spark
import localSpark.implicits._
val df = Seq[(java.lang.Double, Double)](
(4.0, 4.0),
(10.0, 10.0),
(10.0, 10.0),
(Double.NaN, 8.0),
(null, 8.0)
).toDF("value", "expected_mean_value")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the "value" column use java.lang.Double type ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it's nullable

val imputer = new Imputer()
.setInputCols(Array("value"))
.setOutputCols(Array("out"))
.setStrategy("mean")
val model = imputer.fit(df)
testTransformer[(java.lang.Double, Double)](df, model, "expected_mean_value", "out") {
case Row(exp: java.lang.Double, out: Double) =>
assert((exp.isNaN && out.isNaN) || (exp == out),
s"Imputed values differ. Expected: $exp, actual: $out")
}
}

test("Imputer throws exception when surrogate cannot be computed") {
val df = spark.createDataFrame( Seq(
(0, Double.NaN, 1.0, 1.0),
Expand Down Expand Up @@ -164,8 +185,6 @@ object ImputerSuite {
* @param df DataFrame with columns "id", "value", "expected_mean", "expected_median"
*/
def iterateStrategyTest(imputer: Imputer, df: DataFrame): Unit = {
val inputCols = imputer.getInputCols

Seq("mean", "median").foreach { strategy =>
imputer.setStrategy(strategy)
val model = imputer.fit(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package org.apache.spark.ml.feature

import scala.collection.mutable.ArrayBuilder

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.SparkException
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col

class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class InteractionSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

Expand Down Expand Up @@ -63,24 +63,25 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def

test("numeric interaction") {
val data = Seq(
(2, Vectors.dense(3.0, 4.0)),
(1, Vectors.dense(1.0, 5.0))
).toDF("a", "b")
(2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
(1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
).toDF("a", "b", "expected")
val groupAttr = new AttributeGroup(
"b",
Array[Attribute](
NumericAttribute.defaultAttr.withName("foo"),
NumericAttribute.defaultAttr.withName("bar")))
val df = data.select(
col("a").as("a", NumericAttribute.defaultAttr.toMetadata()),
col("b").as("b", groupAttr.toMetadata()))
col("b").as("b", groupAttr.toMetadata()),
col("expected"))
val trans = new Interaction().setInputCols(Array("a", "b")).setOutputCol("features")
testTransformer[(Int, Vector, Vector)](df, trans, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features === expected)
}

val res = trans.transform(df)
val expected = Seq(
(2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
(1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
).toDF("a", "b", "features")
assert(res.collect() === expected.collect())
val attrs = AttributeGroup.fromStructField(res.schema("features"))
val expectedAttrs = new AttributeGroup(
"features",
Expand All @@ -92,9 +93,9 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def

test("nominal interaction") {
val data = Seq(
(2, Vectors.dense(3.0, 4.0)),
(1, Vectors.dense(1.0, 5.0))
).toDF("a", "b")
(2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
(1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
).toDF("a", "b", "expected")
val groupAttr = new AttributeGroup(
"b",
Array[Attribute](
Expand All @@ -103,14 +104,15 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
val df = data.select(
col("a").as(
"a", NominalAttribute.defaultAttr.withValues(Array("up", "down", "left")).toMetadata()),
col("b").as("b", groupAttr.toMetadata()))
col("b").as("b", groupAttr.toMetadata()),
col("expected"))
val trans = new Interaction().setInputCols(Array("a", "b")).setOutputCol("features")
testTransformer[(Int, Vector, Vector)](df, trans, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features === expected)
}

val res = trans.transform(df)
val expected = Seq(
(2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
(1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
).toDF("a", "b", "features")
assert(res.collect() === expected.collect())
val attrs = AttributeGroup.fromStructField(res.schema("features"))
val expectedAttrs = new AttributeGroup(
"features",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.sql.Row

class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class MaxAbsScalerSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

Expand All @@ -45,9 +44,10 @@ class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
.setOutputCol("scaled")

val model = scaler.fit(df)
model.transform(df).select("expected", "scaled").collect()
.foreach { case Row(vector1: Vector, vector2: Vector) =>
assert(vector1.equals(vector2), s"MaxAbsScaler ut error: $vector2 should be $vector1")
testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
case Row(expectedVec: Vector, actualVec: Vector) =>
assert(expectedVec === actualVec,
s"MaxAbsScaler error: Expected $expectedVec but computed $actualVec")
}

MLTestingUtils.checkCopyAndUids(scaler, model)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.sql.{Dataset, Row}

class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {

class MinHashLSHSuite extends MLTest with DefaultReadWriteTest {

@transient var dataset: Dataset[_] = _

Expand Down Expand Up @@ -167,4 +166,20 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
assert(precision == 1.0)
assert(recall >= 0.7)
}

test("MinHashLSHModel.transform should work with Structured Streaming") {
val localSpark = spark
import localSpark.implicits._

val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0)))
model.set(model.inputCol, "keys")
testTransformer[Tuple1[Vector]](dataset.toDF(), model, "keys", model.getOutputCol) {
case Row(_: Vector, output: Seq[_]) =>
assert(output.length === model.randCoefficients.length)
// no AND-amplification yet: SPARK-18450, so each hash output is of length 1
output.foreach {
case hashOutput: Vector => assert(hashOutput.size === 1)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have "expected" column" here to compare with ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's necessary for testing that this works with structured streaming. (I can't see how streaming would mess up the correctness of the algorithm.)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.sql.Row

class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class MinMaxScalerSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

Expand All @@ -48,9 +46,9 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
.setMax(5)

val model = scaler.fit(df)
model.transform(df).select("expected", "scaled").collect()
.foreach { case Row(vector1: Vector, vector2: Vector) =>
assert(vector1.equals(vector2), "Transformed vector is different with expected.")
testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
case Row(vector1: Vector, vector2: Vector) =>
assert(vector1 === vector2, "Transformed vector is different with expected.")
}

MLTestingUtils.checkCopyAndUids(scaler, model)
Expand Down Expand Up @@ -114,7 +112,7 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
val model = scaler.fit(df)
model.transform(df).select("expected", "scaled").collect()
.foreach { case Row(vector1: Vector, vector2: Vector) =>
assert(vector1.equals(vector2), "Transformed vector is different with expected.")
assert(vector1 === vector2, "Transformed vector is different with expected.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class NGramSuite extends MLTest with DefaultReadWriteTest {

def testNGram(t: NGram, dataFrame: DataFrame): Unit = {
testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", "wantedNGrams") {
case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) =>
case Row(actualNGrams : Seq[_], wantedNGrams: Seq[_]) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why change Seq[String] to Seq[_], is it for fixing some potential issue ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String is not actually checked because of erasure, so IntelliJ complained with a style warning before this change.

assert(actualNGrams === wantedNGrams)
}
}
Expand Down