Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into apache-…
Browse files Browse the repository at this point in the history
…master
  • Loading branch information
Asif Shahid committed Dec 9, 2020
2 parents ab4d286 + 29fed23 commit 91d5d3f
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 60 deletions.
16 changes: 8 additions & 8 deletions dev/deps/spark-deps-hadoop-2.7-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,17 @@ httpclient/4.5.13//httpclient-4.5.13.jar
httpcore/4.4.12//httpcore-4.4.12.jar
istack-commons-runtime/3.0.8//istack-commons-runtime-3.0.8.jar
ivy/2.4.0//ivy-2.4.0.jar
jackson-annotations/2.10.0//jackson-annotations-2.10.0.jar
jackson-annotations/2.10.5//jackson-annotations-2.10.5.jar
jackson-core-asl/1.9.13//jackson-core-asl-1.9.13.jar
jackson-core/2.10.0//jackson-core-2.10.0.jar
jackson-databind/2.10.0//jackson-databind-2.10.0.jar
jackson-dataformat-yaml/2.10.0//jackson-dataformat-yaml-2.10.0.jar
jackson-core/2.10.5//jackson-core-2.10.5.jar
jackson-databind/2.10.5.1//jackson-databind-2.10.5.1.jar
jackson-dataformat-yaml/2.10.5//jackson-dataformat-yaml-2.10.5.jar
jackson-datatype-jsr310/2.11.2//jackson-datatype-jsr310-2.11.2.jar
jackson-jaxrs/1.9.13//jackson-jaxrs-1.9.13.jar
jackson-mapper-asl/1.9.13//jackson-mapper-asl-1.9.13.jar
jackson-module-jaxb-annotations/2.10.0//jackson-module-jaxb-annotations-2.10.0.jar
jackson-module-paranamer/2.10.0//jackson-module-paranamer-2.10.0.jar
jackson-module-scala_2.12/2.10.0//jackson-module-scala_2.12-2.10.0.jar
jackson-module-jaxb-annotations/2.10.5//jackson-module-jaxb-annotations-2.10.5.jar
jackson-module-paranamer/2.10.5//jackson-module-paranamer-2.10.5.jar
jackson-module-scala_2.12/2.10.5//jackson-module-scala_2.12-2.10.5.jar
jackson-xc/1.9.13//jackson-xc-1.9.13.jar
jakarta.activation-api/1.2.1//jakarta.activation-api-1.2.1.jar
jakarta.annotation-api/1.3.5//jakarta.annotation-api-1.3.5.jar
Expand Down Expand Up @@ -220,7 +220,7 @@ shapeless_2.12/2.3.3//shapeless_2.12-2.3.3.jar
shims/0.9.0//shims-0.9.0.jar
slf4j-api/1.7.30//slf4j-api-1.7.30.jar
slf4j-log4j12/1.7.30//slf4j-log4j12-1.7.30.jar
snakeyaml/1.24//snakeyaml-1.24.jar
snakeyaml/1.26//snakeyaml-1.26.jar
snappy-java/1.1.8//snappy-java-1.1.8.jar
spire-macros_2.12/0.17.0-M1//spire-macros_2.12-0.17.0-M1.jar
spire-platform_2.12/0.17.0-M1//spire-platform_2.12-0.17.0-M1.jar
Expand Down
16 changes: 8 additions & 8 deletions dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,18 @@ httpclient/4.5.13//httpclient-4.5.13.jar
httpcore/4.4.12//httpcore-4.4.12.jar
istack-commons-runtime/3.0.8//istack-commons-runtime-3.0.8.jar
ivy/2.4.0//ivy-2.4.0.jar
jackson-annotations/2.10.0//jackson-annotations-2.10.0.jar
jackson-annotations/2.10.5//jackson-annotations-2.10.5.jar
jackson-core-asl/1.9.13//jackson-core-asl-1.9.13.jar
jackson-core/2.10.0//jackson-core-2.10.0.jar
jackson-databind/2.10.0//jackson-databind-2.10.0.jar
jackson-dataformat-yaml/2.10.0//jackson-dataformat-yaml-2.10.0.jar
jackson-core/2.10.5//jackson-core-2.10.5.jar
jackson-databind/2.10.5.1//jackson-databind-2.10.5.1.jar
jackson-dataformat-yaml/2.10.5//jackson-dataformat-yaml-2.10.5.jar
jackson-datatype-jsr310/2.11.2//jackson-datatype-jsr310-2.11.2.jar
jackson-jaxrs-base/2.9.5//jackson-jaxrs-base-2.9.5.jar
jackson-jaxrs-json-provider/2.9.5//jackson-jaxrs-json-provider-2.9.5.jar
jackson-mapper-asl/1.9.13//jackson-mapper-asl-1.9.13.jar
jackson-module-jaxb-annotations/2.10.0//jackson-module-jaxb-annotations-2.10.0.jar
jackson-module-paranamer/2.10.0//jackson-module-paranamer-2.10.0.jar
jackson-module-scala_2.12/2.10.0//jackson-module-scala_2.12-2.10.0.jar
jackson-module-jaxb-annotations/2.10.5//jackson-module-jaxb-annotations-2.10.5.jar
jackson-module-paranamer/2.10.5//jackson-module-paranamer-2.10.5.jar
jackson-module-scala_2.12/2.10.5//jackson-module-scala_2.12-2.10.5.jar
jakarta.activation-api/1.2.1//jakarta.activation-api-1.2.1.jar
jakarta.annotation-api/1.3.5//jakarta.annotation-api-1.3.5.jar
jakarta.inject/2.6.1//jakarta.inject-2.6.1.jar
Expand Down Expand Up @@ -235,7 +235,7 @@ shapeless_2.12/2.3.3//shapeless_2.12-2.3.3.jar
shims/0.9.0//shims-0.9.0.jar
slf4j-api/1.7.30//slf4j-api-1.7.30.jar
slf4j-log4j12/1.7.30//slf4j-log4j12-1.7.30.jar
snakeyaml/1.24//snakeyaml-1.24.jar
snakeyaml/1.26//snakeyaml-1.26.jar
snappy-java/1.1.8//snappy-java-1.1.8.jar
spire-macros_2.12/0.17.0-M1//spire-macros_2.12-0.17.0-M1.jar
spire-platform_2.12/0.17.0-M1//spire-platform_2.12-0.17.0-M1.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1548,9 +1548,9 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
val interceptsExpected1 = Vectors.dense(
1.0000152482448372, 3.591773288423673, 5.079685953744937)

checkCoefficientsEquivalent(model1.coefficientMatrix, coefficientsExpected1)
checkBoundedMLORCoefficientsEquivalent(model1.coefficientMatrix, coefficientsExpected1)
assert(model1.interceptVector ~== interceptsExpected1 relTol 0.01)
checkCoefficientsEquivalent(model2.coefficientMatrix, coefficientsExpected1)
checkBoundedMLORCoefficientsEquivalent(model2.coefficientMatrix, coefficientsExpected1)
assert(model2.interceptVector ~== interceptsExpected1 relTol 0.01)

// Bound constrained optimization with bound on both side.
Expand Down Expand Up @@ -1585,9 +1585,9 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
isTransposed = true)
val interceptsExpected3 = Vectors.dense(1.0, 2.0, 2.0)

checkCoefficientsEquivalent(model3.coefficientMatrix, coefficientsExpected3)
checkBoundedMLORCoefficientsEquivalent(model3.coefficientMatrix, coefficientsExpected3)
assert(model3.interceptVector ~== interceptsExpected3 relTol 0.01)
checkCoefficientsEquivalent(model4.coefficientMatrix, coefficientsExpected3)
checkBoundedMLORCoefficientsEquivalent(model4.coefficientMatrix, coefficientsExpected3)
assert(model4.interceptVector ~== interceptsExpected3 relTol 0.01)

// Bound constrained optimization with infinite bound on both side.
Expand Down Expand Up @@ -1621,9 +1621,9 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
val interceptsExpected5 = Vectors.dense(
-2.2231282183460723, 0.3669496747012527, 1.856178543644802)

checkCoefficientsEquivalent(model5.coefficientMatrix, coefficientsExpected5)
checkBoundedMLORCoefficientsEquivalent(model5.coefficientMatrix, coefficientsExpected5)
assert(model5.interceptVector ~== interceptsExpected5 relTol 0.01)
checkCoefficientsEquivalent(model6.coefficientMatrix, coefficientsExpected5)
checkBoundedMLORCoefficientsEquivalent(model6.coefficientMatrix, coefficientsExpected5)
assert(model6.interceptVector ~== interceptsExpected5 relTol 0.01)
}

Expand Down Expand Up @@ -1719,9 +1719,9 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
1.7524631428961193, 1.2292565990448736, 1.3433784431904323, 1.5846063017678864),
isTransposed = true)

checkCoefficientsEquivalent(model1.coefficientMatrix, coefficientsExpected)
checkBoundedMLORCoefficientsEquivalent(model1.coefficientMatrix, coefficientsExpected)
assert(model1.interceptVector.toArray === Array.fill(3)(0.0))
checkCoefficientsEquivalent(model2.coefficientMatrix, coefficientsExpected)
checkBoundedMLORCoefficientsEquivalent(model2.coefficientMatrix, coefficientsExpected)
assert(model2.interceptVector.toArray === Array.fill(3)(0.0))
}

Expand Down Expand Up @@ -2953,16 +2953,17 @@ object LogisticRegressionSuite {
}

/**
* Note: This method is only used in Bounded MLOR (without regularization) test
* When no regularization is applied, the multinomial coefficients lack identifiability
* because we do not use a pivot class. We can add any constant value to the coefficients
* and get the same likelihood. If fitting under bound constrained optimization, we don't
* choose the mean centered coefficients like what we do for unbound problems, since they
* may out of the bounds. We use this function to check whether two coefficients are equivalent.
*/
def checkCoefficientsEquivalent(coefficients1: Matrix, coefficients2: Matrix): Unit = {
def checkBoundedMLORCoefficientsEquivalent(coefficients1: Matrix, coefficients2: Matrix): Unit = {
coefficients1.colIter.zip(coefficients2.colIter).foreach { case (col1: Vector, col2: Vector) =>
(col1.asBreeze - col2.asBreeze).toArray.toSeq.sliding(2).foreach {
case Seq(v1, v2) => assert(v1 ~= v2 absTol 1E-3)
case Seq(v1, v2) => assert(v1 ~= v2 absTol 1E-2)
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@
<!-- for now, not running scalafmt as part of default verify pipeline -->
<scalafmt.skip>true</scalafmt.skip>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<fasterxml.jackson.version>2.10.0</fasterxml.jackson.version>
<fasterxml.jackson.version>2.10.5</fasterxml.jackson.version>
<fasterxml.jackson-databind.version>2.10.5.1</fasterxml.jackson-databind.version>
<snappy.version>1.1.8</snappy.version>
<netlib.java.version>1.1.2</netlib.java.version>
<commons-codec.version>1.10</commons-codec.version>
Expand Down Expand Up @@ -773,7 +774,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${fasterxml.jackson.version}</version>
<version>${fasterxml.jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,28 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {

case _ => throw new IllegalStateException(s"fail to normalize $expr")
}

val FLOAT_NORMALIZER: Any => Any = (input: Any) => {
val f = input.asInstanceOf[Float]
if (f.isNaN) {
Float.NaN
} else if (f == -0.0f) {
0.0f
} else {
f
}
}

val DOUBLE_NORMALIZER: Any => Any = (input: Any) => {
val d = input.asInstanceOf[Double]
if (d.isNaN) {
Double.NaN
} else if (d == -0.0d) {
0.0d
} else {
d
}
}
}

case class NormalizeNaNAndZero(child: Expression) extends UnaryExpression with ExpectsInputTypes {
Expand All @@ -152,27 +174,8 @@ case class NormalizeNaNAndZero(child: Expression) extends UnaryExpression with E
override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(FloatType, DoubleType))

private lazy val normalizer: Any => Any = child.dataType match {
case FloatType => (input: Any) => {
val f = input.asInstanceOf[Float]
if (f.isNaN) {
Float.NaN
} else if (f == -0.0f) {
0.0f
} else {
f
}
}

case DoubleType => (input: Any) => {
val d = input.asInstanceOf[Double]
if (d.isNaN) {
Double.NaN
} else if (d == -0.0d) {
0.0d
} else {
d
}
}
case FloatType => NormalizeFloatingNumbers.FLOAT_NORMALIZER
case DoubleType => NormalizeFloatingNumbers.DOUBLE_NORMALIZER
}

override def nullSafeEval(input: Any): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3547,15 +3547,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Create a [[RepairTableStatement]].
* Create a [[RepairTable]].
*
* For example:
* {{{
* MSCK REPAIR TABLE multi_part_name
* }}}
*/
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
RepairTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()))
RepairTable(
UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier()), "MSCK REPAIR TABLE"))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.XxHash64Function
import org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers.{DOUBLE_NORMALIZER, FLOAT_NORMALIZER}
import org.apache.spark.sql.types._

// A helper class for HyperLogLogPlusPlus.
Expand Down Expand Up @@ -88,7 +89,12 @@ class HyperLogLogPlusPlusHelper(relativeSD: Double) extends Serializable {
*
* Variable names in the HLL++ paper match variable names in the code.
*/
def update(buffer: InternalRow, bufferOffset: Int, value: Any, dataType: DataType): Unit = {
def update(buffer: InternalRow, bufferOffset: Int, _value: Any, dataType: DataType): Unit = {
val value = dataType match {
case FloatType => FLOAT_NORMALIZER.apply(_value)
case DoubleType => DOUBLE_NORMALIZER.apply(_value)
case _ => _value
}
// Create the hashed value 'x'.
val x = XxHash64Function.hash(value, dataType, 42L)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,4 +554,94 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(GreaterThan(Literal(Float.NaN), Literal(Float.NaN)), false)
checkEvaluation(GreaterThan(Literal(0.0F), Literal(-0.0F)), false)
}

test("SPARK-32110: compare special double/float values in array") {
def createUnsafeDoubleArray(d: Double): Literal = {
Literal(UnsafeArrayData.fromPrimitiveArray(Array(d)), ArrayType(DoubleType))
}
def createSafeDoubleArray(d: Double): Literal = {
Literal(new GenericArrayData(Array(d)), ArrayType(DoubleType))
}
def createUnsafeFloatArray(d: Double): Literal = {
Literal(UnsafeArrayData.fromPrimitiveArray(Array(d.toFloat)), ArrayType(FloatType))
}
def createSafeFloatArray(d: Double): Literal = {
Literal(new GenericArrayData(Array(d.toFloat)), ArrayType(FloatType))
}
def checkExpr(
exprBuilder: (Expression, Expression) => Expression,
left: Double,
right: Double,
expected: Any): Unit = {
// test double
checkEvaluation(
exprBuilder(createUnsafeDoubleArray(left), createUnsafeDoubleArray(right)), expected)
checkEvaluation(
exprBuilder(createUnsafeDoubleArray(left), createSafeDoubleArray(right)), expected)
checkEvaluation(
exprBuilder(createSafeDoubleArray(left), createSafeDoubleArray(right)), expected)
// test float
checkEvaluation(
exprBuilder(createUnsafeFloatArray(left), createUnsafeFloatArray(right)), expected)
checkEvaluation(
exprBuilder(createUnsafeFloatArray(left), createSafeFloatArray(right)), expected)
checkEvaluation(
exprBuilder(createSafeFloatArray(left), createSafeFloatArray(right)), expected)
}

checkExpr(EqualTo, Double.NaN, Double.NaN, true)
checkExpr(EqualTo, Double.NaN, Double.PositiveInfinity, false)
checkExpr(EqualTo, 0.0, -0.0, true)
checkExpr(GreaterThan, Double.NaN, Double.PositiveInfinity, true)
checkExpr(GreaterThan, Double.NaN, Double.NaN, false)
checkExpr(GreaterThan, 0.0, -0.0, false)
}

test("SPARK-32110: compare special double/float values in struct") {
def createUnsafeDoubleRow(d: Double): Literal = {
val dt = new StructType().add("d", "double")
val converter = UnsafeProjection.create(dt)
val unsafeRow = converter.apply(InternalRow(d))
Literal(unsafeRow, dt)
}
def createSafeDoubleRow(d: Double): Literal = {
Literal(InternalRow(d), new StructType().add("d", "double"))
}
def createUnsafeFloatRow(d: Double): Literal = {
val dt = new StructType().add("f", "float")
val converter = UnsafeProjection.create(dt)
val unsafeRow = converter.apply(InternalRow(d.toFloat))
Literal(unsafeRow, dt)
}
def createSafeFloatRow(d: Double): Literal = {
Literal(InternalRow(d.toFloat), new StructType().add("f", "float"))
}
def checkExpr(
exprBuilder: (Expression, Expression) => Expression,
left: Double,
right: Double,
expected: Any): Unit = {
// test double
checkEvaluation(
exprBuilder(createUnsafeDoubleRow(left), createUnsafeDoubleRow(right)), expected)
checkEvaluation(
exprBuilder(createUnsafeDoubleRow(left), createSafeDoubleRow(right)), expected)
checkEvaluation(
exprBuilder(createSafeDoubleRow(left), createSafeDoubleRow(right)), expected)
// test float
checkEvaluation(
exprBuilder(createUnsafeFloatRow(left), createUnsafeFloatRow(right)), expected)
checkEvaluation(
exprBuilder(createUnsafeFloatRow(left), createSafeFloatRow(right)), expected)
checkEvaluation(
exprBuilder(createSafeFloatRow(left), createSafeFloatRow(right)), expected)
}

checkExpr(EqualTo, Double.NaN, Double.NaN, true)
checkExpr(EqualTo, Double.NaN, Double.PositiveInfinity, false)
checkExpr(EqualTo, 0.0, -0.0, true)
checkExpr(GreaterThan, Double.NaN, Double.PositiveInfinity, true)
checkExpr(GreaterThan, Double.NaN, Double.NaN, false)
checkExpr(GreaterThan, 0.0, -0.0, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.catalyst.expressions.aggregate

import java.lang.{Double => JDouble}
import java.util.Random

import scala.collection.mutable

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{BoundReference, SpecificInternalRow}
import org.apache.spark.sql.types.{DataType, IntegerType}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType}

class HyperLogLogPlusPlusSuite extends SparkFunSuite {

Expand Down Expand Up @@ -153,4 +154,25 @@ class HyperLogLogPlusPlusSuite extends SparkFunSuite {
// Check if the buffers are equal.
assert(buffer2 == buffer1a, "Buffers should be equal")
}

test("SPARK-32110: add 0.0 and -0.0") {
val (hll, input, buffer) = createEstimator(0.05, DoubleType)
input.setDouble(0, 0.0)
hll.update(buffer, input)
input.setDouble(0, -0.0)
hll.update(buffer, input)
evaluateEstimate(hll, buffer, 1);
}

test("SPARK-32110: add NaN") {
val (hll, input, buffer) = createEstimator(0.05, DoubleType)
input.setDouble(0, Double.NaN)
hll.update(buffer, input)
val specialNaN = JDouble.longBitsToDouble(0x7ff1234512345678L)
assert(JDouble.isNaN(specialNaN))
assert(JDouble.doubleToRawLongBits(Double.NaN) != JDouble.doubleToRawLongBits(specialNaN))
input.setDouble(0, specialNaN)
hll.update(buffer, input)
evaluateEstimate(hll, buffer, 1);
}
}
Loading

0 comments on commit 91d5d3f

Please sign in to comment.