Skip to content

Commit 371ab5a

Browse files
committed
[SPARK-37731][SQL] Refactor and cleanup function lookup in Analyzer
### What changes were proposed in this pull request? Today, the function lookup code path is pretty hard to understand as it spreads over many places: 1. lookup v1 function 2. lookup v2 function 3. lookup higher-order function 4. lookup table function 5. lookup functions in different levels: builtin, temp and persistent. This PR is a major refactor of the function lookup code path and cleans it up quite a bit. In general, it follows the idea of table lookup: 1. Analyzer looks up built-in or temp functions first. 2. Analyzer qualifies the function name with current catalog and namespace, or with view catalog/namespace if we are resolving a view. 3. Analyzer calls v1 sessin catalog if the catalog is `spark_catalog`, otherwise call the v2 catalog. With this refactor, the analyzer is kind of the router and the v1 session catalog can just have some small functions with very specific goals. The function DDL commands also follow similar table/view commands and can fail automatically if the command requires persistent functions but the resolved function is built-in/temp. After this change, it should be simpler to add v2 function DDL commands. Note that, table function lookup is still in its own rule as it has a dedicated function registry and doesn't share the namespace of the scalar functions. ### Why are the changes needed? code cleanup to improve readability. ### Does this PR introduce _any_ user-facing change? Yes. Since Spark 3.3, DESCRIBE FUNCTION fails if the function does not exist. In Spark 3.2 or earlier, DESCRIBE FUNCTION can still run and print "Function: func_name not found". ### How was this patch tested? existing tests Closes apache#35004 from cloud-fan/func. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 189b205 commit 371ab5a

File tree

40 files changed

+951
-698
lines changed

40 files changed

+951
-698
lines changed

docs/sql-migration-guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ license: |
5454

5555
- Since Spark 3.3, nulls are written as empty strings in CSV data source by default. In Spark 3.2 or earlier, nulls were written as empty strings as quoted empty strings, `""`. To restore the previous behavior, set `nullValue` to `""`.
5656

57+
- Since Spark 3.3, DESCRIBE FUNCTION fails if the function does not exist. In Spark 3.2 or earlier, DESCRIBE FUNCTION can still run and print "Function: func_name not found".
58+
5759
## Upgrading from Spark SQL 3.1 to 3.2
5860

5961
- Since Spark 3.2, ADD FILE/JAR/ARCHIVE commands require each path to be enclosed by `"` or `'` if the path contains whitespaces.

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogExtension.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* @since 3.0.0
3131
*/
3232
@Evolving
33-
public interface CatalogExtension extends TableCatalog, SupportsNamespaces {
33+
public interface CatalogExtension extends TableCatalog, FunctionCatalog, SupportsNamespaces {
3434

3535
/**
3636
* This will be called only once by Spark to pass in the Spark built-in session catalog, after

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import java.util.Map;
2121

2222
import org.apache.spark.annotation.Evolving;
23-
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
24-
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
25-
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
26-
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
23+
import org.apache.spark.sql.catalyst.analysis.*;
24+
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
2725
import org.apache.spark.sql.connector.expressions.Transform;
2826
import org.apache.spark.sql.types.StructType;
2927
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -161,11 +159,30 @@ public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException
161159
return asNamespaceCatalog().dropNamespace(namespace);
162160
}
163161

162+
@Override
163+
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
164+
return asFunctionCatalog().loadFunction(ident);
165+
}
166+
167+
@Override
168+
public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
169+
return asFunctionCatalog().listFunctions(namespace);
170+
}
171+
172+
@Override
173+
public boolean functionExists(Identifier ident) {
174+
return asFunctionCatalog().functionExists(ident);
175+
}
176+
164177
private TableCatalog asTableCatalog() {
165-
return (TableCatalog)delegate;
178+
return (TableCatalog) delegate;
166179
}
167180

168181
private SupportsNamespaces asNamespaceCatalog() {
169-
return (SupportsNamespaces)delegate;
182+
return (SupportsNamespaces) delegate;
183+
}
184+
185+
private FunctionCatalog asFunctionCatalog() {
186+
return (FunctionCatalog) delegate;
170187
}
171188
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/FunctionCatalog.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,17 @@ public interface FunctionCatalog extends CatalogPlugin {
5050
*/
5151
UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException;
5252

53+
/**
54+
* Returns true if the function exists, false otherwise.
55+
*
56+
* @since 3.3.0
57+
*/
58+
default boolean functionExists(Identifier ident) {
59+
try {
60+
loadFunction(ident);
61+
return true;
62+
} catch (NoSuchFunctionException e) {
63+
return false;
64+
}
65+
}
5366
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 220 additions & 165 deletions
Large diffs are not rendered by default.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
125125
case u: UnresolvedRelation =>
126126
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")
127127

128+
case u: UnresolvedFunc =>
129+
throw QueryCompilationErrors.noSuchFunctionError(
130+
u.multipartIdentifier, u, u.possibleQualifiedName)
131+
128132
case u: UnresolvedHint =>
129133
u.failAnalysis(s"Hint not found: ${u.name}")
130134

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,37 @@ object FunctionRegistry {
743743

744744
val functionSet: Set[FunctionIdentifier] = builtin.listFunction().toSet
745745

746+
private def makeExprInfoForVirtualOperator(name: String, usage: String): ExpressionInfo = {
747+
new ExpressionInfo(
748+
null,
749+
null,
750+
name,
751+
usage,
752+
"",
753+
"",
754+
"",
755+
"",
756+
"",
757+
"",
758+
"built-in")
759+
}
760+
761+
val builtinOperators: Map[String, ExpressionInfo] = Map(
762+
"<>" -> makeExprInfoForVirtualOperator("<>",
763+
"expr1 <> expr2 - Returns true if `expr1` is not equal to `expr2`."),
764+
"!=" -> makeExprInfoForVirtualOperator("!=",
765+
"expr1 != expr2 - Returns true if `expr1` is not equal to `expr2`."),
766+
"between" -> makeExprInfoForVirtualOperator("between",
767+
"expr1 [NOT] BETWEEN expr2 AND expr3 - " +
768+
"evaluate if `expr1` is [not] in between `expr2` and `expr3`."),
769+
"case" -> makeExprInfoForVirtualOperator("case",
770+
"CASE expr1 WHEN expr2 THEN expr3 [WHEN expr4 THEN expr5]* [ELSE expr6] END " +
771+
"- When `expr1` = `expr2`, returns `expr3`; when `expr1` = `expr4`, return `expr5`; " +
772+
"else return `expr6`."),
773+
"||" -> makeExprInfoForVirtualOperator("||",
774+
"expr1 || expr2 - Returns the concatenation of `expr1` and `expr2`.")
775+
)
776+
746777
/**
747778
* Create a SQL function builder and corresponding `ExpressionInfo`.
748779
* @param name The function name.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20-
import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable}
20+
import org.apache.spark.sql.catalyst.plans.logical.{DropFunction, DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable}
2121
import org.apache.spark.sql.catalyst.rules.Rule
2222
import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
2323

@@ -35,5 +35,7 @@ object ResolveCommandsWithIfExists extends Rule[LogicalPlan] {
3535
NoopCommand("DROP VIEW", u.multipartIdentifier)
3636
case UncacheTable(u: UnresolvedRelation, ifExists, _) if ifExists =>
3737
NoopCommand("UNCACHE TABLE", u.multipartIdentifier)
38+
case DropFunction(u: UnresolvedFunc, ifExists) if ifExists =>
39+
NoopCommand("DROP FUNCTION", u.multipartIdentifier)
3840
}
3941
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,50 +21,8 @@ import org.apache.spark.sql.catalyst.expressions._
2121
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2222
import org.apache.spark.sql.catalyst.rules.Rule
2323
import org.apache.spark.sql.catalyst.trees.TreePattern._
24-
import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
25-
import org.apache.spark.sql.errors.QueryCompilationErrors
2624
import org.apache.spark.sql.types.DataType
2725

28-
/**
29-
* Resolve a higher order functions from the catalog. This is different from regular function
30-
* resolution because lambda functions can only be resolved after the function has been resolved;
31-
* so we need to resolve higher order function when all children are either resolved or a lambda
32-
* function.
33-
*/
34-
case class ResolveHigherOrderFunctions(catalogManager: CatalogManager)
35-
extends Rule[LogicalPlan] with LookupCatalog {
36-
37-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressionsWithPruning(
38-
_.containsPattern(LAMBDA_FUNCTION), ruleId) {
39-
case u @ UnresolvedFunction(AsFunctionIdentifier(ident), children, false, filter, ignoreNulls)
40-
if hasLambdaAndResolvedArguments(children) =>
41-
withPosition(u) {
42-
catalogManager.v1SessionCatalog.lookupFunction(ident, children) match {
43-
case func: HigherOrderFunction =>
44-
filter.foreach(_.failAnalysis("FILTER predicate specified, " +
45-
s"but ${func.prettyName} is not an aggregate function"))
46-
if (ignoreNulls) {
47-
throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
48-
func.prettyName, "IGNORE NULLS")
49-
}
50-
func
51-
case other => other.failAnalysis(
52-
"A lambda function should only be used in a higher order function. However, " +
53-
s"its class is ${other.getClass.getCanonicalName}, which is not a " +
54-
s"higher order function.")
55-
}
56-
}
57-
}
58-
59-
/**
60-
* Check if the arguments of a function are either resolved or a lambda function.
61-
*/
62-
private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
63-
val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
64-
lambdas.nonEmpty && others.forall(_.resolved)
65-
}
66-
}
67-
6826
/**
6927
* Resolve the lambda variables exposed by a higher order functions.
7028
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, LeafExpression, Une
2323
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
2424
import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, UNRESOLVED_FUNC}
2525
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
26-
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCatalog}
26+
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, Table, TableCatalog}
2727
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
2828
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
29+
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
2930
import org.apache.spark.sql.types.{DataType, StructField}
3031

3132
/**
@@ -52,7 +53,7 @@ case class UnresolvedTable(
5253
}
5354

5455
/**
55-
* Holds the name of a view that has yet to be looked up in a catalog. It will be resolved to
56+
* Holds the name of a view that has yet to be looked up. It will be resolved to
5657
* [[ResolvedView]] during analysis.
5758
*/
5859
case class UnresolvedView(
@@ -115,10 +116,15 @@ case class UnresolvedFieldPosition(position: ColumnPosition) extends FieldPositi
115116
}
116117

117118
/**
118-
* Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to
119-
* [[ResolvedFunc]] during analysis.
119+
* Holds the name of a function that has yet to be looked up. It will be resolved to
120+
* [[ResolvedPersistentFunc]] or [[ResolvedNonPersistentFunc]] during analysis.
120121
*/
121-
case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode {
122+
case class UnresolvedFunc(
123+
multipartIdentifier: Seq[String],
124+
commandName: String,
125+
requirePersistent: Boolean,
126+
funcTypeMismatchHint: Option[String],
127+
possibleQualifiedName: Option[Seq[String]] = None) extends LeafNode {
122128
override lazy val resolved: Boolean = false
123129
override def output: Seq[Attribute] = Nil
124130
final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_FUNC)
@@ -190,15 +196,23 @@ case class ResolvedView(identifier: Identifier, isTemp: Boolean) extends LeafNod
190196
}
191197

192198
/**
193-
* A plan containing resolved function.
199+
* A plan containing resolved persistent function.
194200
*/
195-
// TODO: create a generic representation for v1, v2 function, after we add function
196-
// support to v2 catalog. For now we only need the identifier to fallback to v1 command.
197-
case class ResolvedFunc(identifier: Identifier)
201+
case class ResolvedPersistentFunc(
202+
catalog: FunctionCatalog,
203+
identifier: Identifier,
204+
func: UnboundFunction)
198205
extends LeafNode {
199206
override def output: Seq[Attribute] = Nil
200207
}
201208

209+
/**
210+
* A plan containing resolved non-persistent (temp or built-in) function.
211+
*/
212+
case class ResolvedNonPersistentFunc(name: String, func: UnboundFunction) extends LeafNode {
213+
override def output: Seq[Attribute] = Nil
214+
}
215+
202216
/**
203217
* A plan containing resolved database object name with catalog determined.
204218
*/

0 commit comments

Comments
 (0)