Skip to content

[SPARK-42052][SQL] Codegen Support for HiveSimpleUDF #39865

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,14 @@ private[hive] trait HiveInspectors {
cache
}

def wrap(
row: Array[Any],
wrappers: Array[(Any) => Any],
cache: Array[AnyRef],
dataTypes: Array[DataType]): Array[AnyRef] = {
wrap(row.toSeq, wrappers, cache, dataTypes)
}

/**
* @param dataType Catalyst data type
* @return Hive java object inspector (recursively), not the Writable ObjectInspector
Expand Down
73 changes: 67 additions & 6 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ private[hive] case class HiveSimpleUDF(
name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends Expression
with HiveInspectors
with CodegenFallback
with Logging
with UserDefinedExpression {

Expand All @@ -61,7 +60,7 @@ private[hive] case class HiveSimpleUDF(
lazy val function = funcWrapper.createFunction[UDF]()

@transient
private lazy val method =
protected lazy val method =
function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo).asJava)

@transient
Expand All @@ -77,22 +76,22 @@ private[hive] case class HiveSimpleUDF(

// Create parameter converters
@transient
private lazy val conversionHelper = new ConversionHelper(method, arguments)
protected lazy val conversionHelper = new ConversionHelper(method, arguments)

override lazy val dataType = javaTypeToDataType(method.getGenericReturnType)

@transient
private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray
protected lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray

@transient
lazy val unwrapper = unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector(
method.getGenericReturnType, ObjectInspectorOptions.JAVA))

@transient
private lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length)
protected lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length)

@transient
private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray
protected lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray

// TODO: Finish input output types.
override def eval(input: InternalRow): Any = {
Expand All @@ -104,6 +103,67 @@ private[hive] case class HiveSimpleUDF(
unwrapper(ret)
}

protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val refTerm = ctx.addReferenceObj("this", this)
val evals = children.map(_.genCode(ctx))
val resultType = CodeGenerator.boxedType(dataType)
val resultTerm = ctx.freshName("result")
val inputsTerm = ctx.freshName("inputs")
val inputsWrapTerm = ctx.freshName("inputsWrap")

val initInputs =
s"""
|Object[] $inputsTerm = new Object[${evals.size}];
|""".stripMargin

val setInputs = evals.zipWithIndex.map {
case (eval, i) =>
s"""
|if (${eval.isNull}) {
| $inputsTerm[$i] = null;
|} else {
| $inputsTerm[$i] = ${eval.value};
|}
|""".stripMargin
}

val inputsWrap = {
s"""
|Object[] $inputsWrapTerm = $refTerm.wrap($inputsTerm, $refTerm.wrappers(),
| $refTerm.cached(), $refTerm.inputDataTypes());
|""".stripMargin
}

ev.copy(code =
code"""
|${evals.map(_.code).mkString("\n")}
|$initInputs
|${setInputs.mkString("\n")}
|$inputsWrap
|$resultType $resultTerm = null;
|boolean ${ev.isNull} = false;
|try {
| $resultTerm = ($resultType) $refTerm.unwrapper().apply(
| org.apache.hadoop.hive.ql.exec.FunctionRegistry.invoke(
| $refTerm.method(),
| $refTerm.function(),
| $refTerm.conversionHelper().convertIfNecessary($inputsWrapTerm)));
| ${ev.isNull} = $resultTerm == null;
|} catch (Throwable e) {
| throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(
| "${funcWrapper.functionClassName}",
| "${children.map(_.dataType.catalogString).mkString(", ")}",
| "${dataType.catalogString}",
| e);
|}
|${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
|if (!${ev.isNull}) {
| ${ev.value} = $resultTerm;
|}
|""".stripMargin
)
}

override def toString: String = {
s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
}
Expand Down Expand Up @@ -194,6 +254,7 @@ private[hive] case class HiveGenericUDF(

override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
copy(children = newChildren)

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val refTerm = ctx.addReferenceObj("this", this)
val childrenEvals = children.map(_.genCode(ctx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType}
import org.apache.hadoop.hive.ql.udf.generic._
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject
Expand Down Expand Up @@ -743,6 +744,38 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
}
}
}

test("SPARK-42052: HiveSimpleUDF Codegen Support") {
withUserDefinedFunction("CodeGenHiveSimpleUDF" -> false) {
sql(s"CREATE FUNCTION CodeGenHiveSimpleUDF AS '${classOf[UDFStringString].getName}'")
withTable("HiveSimpleUDFTable") {
sql(s"create table HiveSimpleUDFTable as select 'Spark SQL' as v")
val df = sql("SELECT CodeGenHiveSimpleUDF('Hello', v) from HiveSimpleUDFTable")
val plan = df.queryExecution.executedPlan
assert(plan.isInstanceOf[WholeStageCodegenExec])
checkAnswer(df, Seq(Row("Hello Spark SQL")))
}
}
}

test("SPARK-42052: HiveSimpleUDF Codegen Support w/ execution failure") {
withUserDefinedFunction("CodeGenHiveSimpleUDF" -> false) {
sql(s"CREATE FUNCTION CodeGenHiveSimpleUDF AS '${classOf[SimpleUDFAssertTrue].getName}'")
withTable("HiveSimpleUDFTable") {
sql(s"create table HiveSimpleUDFTable as select false as v")
val df = sql("SELECT CodeGenHiveSimpleUDF(v) from HiveSimpleUDFTable")
checkError(
exception = intercept[SparkException](df.collect()).getCause.asInstanceOf[SparkException],
errorClass = "FAILED_EXECUTE_UDF",
parameters = Map(
"functionName" -> s"${classOf[SimpleUDFAssertTrue].getName}",
"signature" -> "boolean",
"result" -> "boolean"
)
)
}
}
}
}

class TestPair(x: Int, y: Int) extends Writable with Serializable {
Expand Down Expand Up @@ -844,3 +877,12 @@ class ListFiles extends UDF {
if (fileArray != null) Arrays.asList(fileArray: _*) else new ArrayList[String]()
}
}

class SimpleUDFAssertTrue extends UDF {
def evaluate(condition: Boolean): Boolean = {
if (!condition) {
throw new HiveException("ASSERT_TRUE(): assertion failed.");
}
condition
}
}