Skip to content

Commit 1bb272d

Browse files
beliefercloud-fan
authored andcommitted
[SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI)
### What changes were proposed in this pull request? #36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | ### Why are the changes needed? DS V2 supports push down misc non-aggregate functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes #36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 4ad7386 commit 1bb272d

File tree

5 files changed

+71
-2
lines changed

5 files changed

+71
-2
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,24 @@
106106
* <li>Since version: 3.3.0</li>
107107
* </ul>
108108
* </li>
109+
* <li>Name: <code>GREATEST</code>
110+
* <ul>
111+
* <li>SQL semantic: <code>GREATEST(expr, ...)</code></li>
112+
* <li>Since version: 3.4.0</li>
113+
* </ul>
114+
* </li>
115+
* <li>Name: <code>LEAST</code>
116+
* <ul>
117+
* <li>SQL semantic: <code>LEAST(expr, ...)</code></li>
118+
* <li>Since version: 3.4.0</li>
119+
* </ul>
120+
* </li>
121+
* <li>Name: <code>RAND</code>
122+
* <ul>
123+
* <li>SQL semantic: <code>RAND([seed])</code></li>
124+
* <li>Since version: 3.4.0</li>
125+
* </ul>
126+
* </li>
109127
* <li>Name: <code>LN</code>
110128
* <ul>
111129
* <li>SQL semantic: <code>LN(expr)</code></li>

sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ public String build(Expression expr) {
9797
return visitUnaryArithmetic(name, inputToSQL(e.children()[0]));
9898
case "ABS":
9999
case "COALESCE":
100+
case "GREATEST":
101+
case "LEAST":
102+
case "RAND":
100103
case "LN":
101104
case "EXP":
102105
case "POWER":

sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,27 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
9999
} else {
100100
None
101101
}
102+
case Greatest(children) =>
103+
val childrenExpressions = children.flatMap(generateExpression(_))
104+
if (children.length == childrenExpressions.length) {
105+
Some(new GeneralScalarExpression("GREATEST", childrenExpressions.toArray[V2Expression]))
106+
} else {
107+
None
108+
}
109+
case Least(children) =>
110+
val childrenExpressions = children.flatMap(generateExpression(_))
111+
if (children.length == childrenExpressions.length) {
112+
Some(new GeneralScalarExpression("LEAST", childrenExpressions.toArray[V2Expression]))
113+
} else {
114+
None
115+
}
116+
case Rand(child, hideSeed) =>
117+
if (hideSeed) {
118+
Some(new GeneralScalarExpression("RAND", Array.empty[V2Expression]))
119+
} else {
120+
generateExpression(child)
121+
.map(v => new GeneralScalarExpression("RAND", Array[V2Expression](v)))
122+
}
102123
case Log(child) => generateExpression(child)
103124
.map(v => new GeneralScalarExpression("LN", Array[V2Expression](v)))
104125
case Exp(child) => generateExpression(child)
@@ -195,6 +216,13 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
195216
} else {
196217
None
197218
}
219+
case iff: If =>
220+
val childrenExpressions = iff.children.flatMap(generateExpression(_))
221+
if (iff.children.length == childrenExpressions.length) {
222+
Some(new GeneralScalarExpression("CASE_WHEN", childrenExpressions.toArray[V2Expression]))
223+
} else {
224+
None
225+
}
198226
case substring: Substring =>
199227
val children = if (substring.len == Literal(Integer.MAX_VALUE)) {
200228
Seq(substring.str, substring.pos)

sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ private[sql] object H2Dialect extends JdbcDialect {
3535
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")
3636

3737
private val supportedFunctions =
38-
Set("ABS", "COALESCE", "LN", "EXP", "POWER", "SQRT", "FLOOR", "CEIL",
39-
"SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM")
38+
Set("ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LN", "EXP", "POWER", "SQRT", "FLOOR",
39+
"CEIL", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM")
4040

4141
override def isSupportedFunction(funcName: String): Boolean =
4242
supportedFunctions.contains(funcName)

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,26 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
775775
checkFiltersRemoved(df10)
776776
checkPushedInfo(df10, "PushedFilters: [ID IS NOT NULL, ID > 1], ")
777777
checkAnswer(df10, Row("mary", 2))
778+
779+
val df11 = sql(
780+
"""
781+
|SELECT * FROM h2.test.employee
782+
|WHERE GREATEST(bonus, 1100) > 1200 AND LEAST(salary, 10000) > 9000 AND RAND(1) < 1
783+
|""".stripMargin)
784+
checkFiltersRemoved(df11)
785+
checkPushedInfo(df11, "PushedFilters: " +
786+
"[(GREATEST(BONUS, 1100.0)) > 1200.0, (LEAST(SALARY, 10000.00)) > 9000.00, RAND(1) < 1.0]")
787+
checkAnswer(df11, Row(2, "david", 10000, 1300, true))
788+
789+
val df12 = sql(
790+
"""
791+
|SELECT * FROM h2.test.employee
792+
|WHERE IF(SALARY > 10000, SALARY, LEAST(SALARY, 1000)) > 1200
793+
|""".stripMargin)
794+
checkFiltersRemoved(df12)
795+
checkPushedInfo(df12, "PushedFilters: " +
796+
"[(CASE WHEN SALARY > 10000.00 THEN SALARY ELSE LEAST(SALARY, 1000.00) END) > 1200.00]")
797+
checkAnswer(df12, Seq(Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true)))
778798
}
779799

780800
test("scan with filter push-down with ansi mode") {

0 commit comments

Comments
 (0)