Skip to content

Commit b5ce32f

Browse files
beliefercloud-fan
authored andcommitted
[SPARK-39162][SQL][3.3] Jdbc dialect should decide which function could be pushed down
### What changes were proposed in this pull request? This PR used to back port #36521 to 3.3 ### Why are the changes needed? Let function push-down more flexible. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists tests. Closes #36556 from beliefer/SPARK-39162_3.3. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent ec6fc74 commit b5ce32f

File tree

3 files changed

+23
-28
lines changed

3 files changed

+23
-28
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2411,8 +2411,4 @@ object QueryCompilationErrors extends QueryErrorsBase {
24112411
new AnalysisException(
24122412
"Sinks cannot request distribution and ordering in continuous execution mode")
24132413
}
2414-
2415-
def noSuchFunctionError(database: String, funcInfo: String): Throwable = {
2416-
new AnalysisException(s"$database does not support function: $funcInfo")
2417-
}
24182414
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,41 +20,21 @@ package org.apache.spark.sql.jdbc
2020
import java.sql.{SQLException, Types}
2121
import java.util.Locale
2222

23-
import scala.util.control.NonFatal
24-
2523
import org.apache.spark.sql.AnalysisException
2624
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
27-
import org.apache.spark.sql.connector.expressions.Expression
2825
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc}
29-
import org.apache.spark.sql.errors.QueryCompilationErrors
3026
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
3127
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType}
3228

3329
private object H2Dialect extends JdbcDialect {
3430
override def canHandle(url: String): Boolean =
3531
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")
3632

37-
class H2SQLBuilder extends JDBCSQLBuilder {
38-
override def visitSQLFunction(funcName: String, inputs: Array[String]): String = {
39-
funcName match {
40-
case "WIDTH_BUCKET" =>
41-
val functionInfo = super.visitSQLFunction(funcName, inputs)
42-
throw QueryCompilationErrors.noSuchFunctionError("H2", functionInfo)
43-
case _ => super.visitSQLFunction(funcName, inputs)
44-
}
45-
}
46-
}
33+
private val supportedFunctions =
34+
Set("ABS", "COALESCE", "LN", "EXP", "POWER", "SQRT", "FLOOR", "CEIL")
4735

48-
override def compileExpression(expr: Expression): Option[String] = {
49-
val h2SQLBuilder = new H2SQLBuilder()
50-
try {
51-
Some(h2SQLBuilder.build(expr))
52-
} catch {
53-
case NonFatal(e) =>
54-
logWarning("Error occurs while compiling V2 expression", e)
55-
None
56-
}
57-
}
36+
override def isSupportedFunction(funcName: String): Boolean =
37+
supportedFunctions.contains(funcName)
5838

5939
override def compileAggregate(aggFunction: AggregateFunc): Option[String] = {
6040
super.compileAggregate(aggFunction).orElse(

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,27 @@ abstract class JdbcDialect extends Serializable with Logging{
240240
getJDBCType(dataType).map(_.databaseTypeDefinition).getOrElse(dataType.typeName)
241241
s"CAST($l AS $databaseTypeDefinition)"
242242
}
243+
244+
override def visitSQLFunction(funcName: String, inputs: Array[String]): String = {
245+
if (isSupportedFunction(funcName)) {
246+
s"""$funcName(${inputs.mkString(", ")})"""
247+
} else {
248+
// The framework will catch the error and give up the push-down.
249+
// Please see `JdbcDialect.compileExpression(expr: Expression)` for more details.
250+
throw new UnsupportedOperationException(
251+
s"${this.getClass.getSimpleName} does not support function: $funcName")
252+
}
253+
}
243254
}
244255

256+
/**
257+
* Returns whether the database supports function.
258+
* @param funcName Upper-cased function name
259+
* @return True if the database supports function.
260+
*/
261+
@Since("3.3.0")
262+
def isSupportedFunction(funcName: String): Boolean = false
263+
245264
/**
246265
* Converts V2 expression to String representing a SQL expression.
247266
* @param expr The V2 expression to be converted.

0 commit comments

Comments
 (0)