Skip to content

Commit 97f8933

Browse files
cloud-fanJackey Lee
authored andcommitted
[SPARK-26580][SQL] remove Scala 2.11 hack for Scala UDF
## What changes were proposed in this pull request? In apache#22732 , we tried our best to keep the behavior of Scala UDF unchanged in Spark 2.4. However, since Spark 3.0, Scala 2.12 is the default. The trick that was used to keep the behavior unchanged doesn't work with Scala 2.12. This PR proposes to remove the Scala 2.11 hack, as it's not useful. ## How was this patch tested? existing tests. Closes apache#23498 from cloud-fan/udf. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 6f0a26c commit 97f8933

File tree

6 files changed

+23
-40
lines changed

6 files changed

+23
-40
lines changed

docs/sql-migration-guide-upgrade.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ displayTitle: Spark SQL Upgrading Guide
4343

4444
- Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring.
4545

46+
- In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default.
47+
4648
## Upgrading From Spark SQL 2.3 to 2.4
4749

4850
- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -957,23 +957,6 @@ trait ScalaReflection extends Logging {
957957
tpe.dealias.erasure.typeSymbol.asClass.fullName
958958
}
959959

960-
/**
961-
* Returns the nullability of the input parameter types of the scala function object.
962-
*
963-
* Note that this only works with Scala 2.11, and the information returned may be inaccurate if
964-
* used with a different Scala version.
965-
*/
966-
def getParameterTypeNullability(func: AnyRef): Seq[Boolean] = {
967-
if (!Properties.versionString.contains("2.11")) {
968-
logWarning(s"Scala ${Properties.versionString} cannot get type nullability correctly via " +
969-
"reflection, thus Spark cannot add proper input null check for UDF. To avoid this " +
970-
"problem, use the typed UDF interfaces instead.")
971-
}
972-
val methods = func.getClass.getMethods.filter(m => m.getName == "apply" && !m.isBridge)
973-
assert(methods.length == 1)
974-
methods.head.getParameterTypes.map(!_.isPrimitive)
975-
}
976-
977960
/**
978961
* Returns the parameter names and types for the primary constructor of this type.
979962
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,6 @@ case class ScalaUDF(
5454
udfDeterministic: Boolean = true)
5555
extends Expression with NonSQLExpression with UserDefinedExpression {
5656

57-
// The constructor for SPARK 2.1 and 2.2
58-
def this(
59-
function: AnyRef,
60-
dataType: DataType,
61-
children: Seq[Expression],
62-
inputTypes: Seq[DataType],
63-
udfName: Option[String]) = {
64-
this(
65-
function, dataType, children, ScalaReflection.getParameterTypeNullability(function),
66-
inputTypes, udfName, nullable = true, udfDeterministic = true)
67-
}
68-
6957
override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)
7058

7159
override def toString: String =

sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,7 @@ private[sql] case class SparkUserDefinedFunction(
102102
// It's possible that some of the inputs don't have a specific type(e.g. `Any`), skip type
103103
// check and null check for them.
104104
val inputTypes = inputSchemas.map(_.map(_.dataType).getOrElse(AnyDataType))
105-
106-
val inputsNullSafe = if (inputSchemas.isEmpty) {
107-
// This is for backward compatibility of `functions.udf(AnyRef, DataType)`. We need to
108-
// do reflection of the lambda function object and see if its arguments are nullable or not.
109-
// This doesn't work for Scala 2.12 and we should consider removing this workaround, as Spark
110-
// uses Scala 2.12 by default since 3.0.
111-
ScalaReflection.getParameterTypeNullability(f)
112-
} else {
113-
inputSchemas.map(_.map(_.nullable).getOrElse(true))
114-
}
115-
105+
val inputsNullSafe = inputSchemas.map(_.map(_.nullable).getOrElse(true))
116106
ScalaUDF(
117107
f,
118108
dataType,

sql/core/src/main/scala/org/apache/spark/sql/functions.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4250,6 +4250,13 @@ object functions {
42504250
* By default the returned UDF is deterministic. To change it to nondeterministic, call the
42514251
* API `UserDefinedFunction.asNondeterministic()`.
42524252
*
4253+
* Note that, although the Scala closure can have primitive-type function argument, it doesn't
4254+
* work well with null values. Because the Scala closure is passed in as Any type, there is no
4255+
* type information for the function arguments. Without the type information, Spark may blindly
4256+
* pass null to the Scala closure with primitive-type argument, and the closure will see the
4257+
* default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`,
4258+
* the result is 0 for null input.
4259+
*
42534260
* @param f A closure in Scala
42544261
* @param dataType The output data type of the UDF
42554262
*

sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,19 @@ class UDFSuite extends QueryTest with SharedSQLContext {
423423
}
424424
}
425425

426+
test("SPARK-25044 Verify null input handling for primitive types - with udf(Any, DataType)") {
427+
val f = udf((x: Int) => x, IntegerType)
428+
checkAnswer(
429+
Seq(new Integer(1), null).toDF("x").select(f($"x")),
430+
Row(1) :: Row(0) :: Nil)
431+
432+
val f2 = udf((x: Double) => x, DoubleType)
433+
checkAnswer(
434+
Seq(new java.lang.Double(1.1), null).toDF("x").select(f2($"x")),
435+
Row(1.1) :: Row(0.0) :: Nil)
436+
437+
}
438+
426439
test("SPARK-26308: udf with decimal") {
427440
val df1 = spark.createDataFrame(
428441
sparkContext.parallelize(Seq(Row(new BigDecimal("2011000000000002456556")))),

0 commit comments

Comments
 (0)