Skip to content

Commit 500f55f

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7173][SPARK-24497][SQL] Support recursive SQL (apache#22)
### What changes were proposed in this pull request? This PR adds recursive query feature to Spark SQL. A recursive query is defined using the `WITH RECURSIVE` keywords and referring the name of the common table expression within the query. The implementation complies with SQL standard and follows similar rules to other relational databases: - A query is made of an anchor followed by a recursive term. - The anchor terms doesn't contain self reference and it is used to initialize the query. - The recursive term contains a self reference and it is used to expand the current set of rows with new ones. - The anchor and recursive terms must be joined with each other by `UNION` or `UNION ALL` operators. - New rows can only be derived from the newly added rows of the previous iteration (or from the initial set of rows of anchor term). This limitation implies that recursive references can't be used with some of the joins, aggregations or subqueries. Please see `cte-recursive.sql` for some examples. The implemetation has the same limiation that [SPARK-36447](https://issues.apache.org/jira/browse/SPARK-36447) / apache#33671 has: > With-CTEs mixed with SQL commands or DMLs will still go through the old inline code path because of our non-standard language specs and not-unified command/DML interfaces. which means that recursive queries are not supported in SQL commands and DMLs. With apache#42036 this restriction is lifted and a recursive CTE only doesn't work when the CTE is force inlined (`spark.sql.legacy.inlineCTEInCommands=true` or the command is a multi-insert statement). ### Why are the changes needed? Recursive query is an ANSI SQL feature that is useful to process hierarchical data. ### Does this PR introduce _any_ user-facing change? Yes, adds recursive query feature. ### How was this patch tested? Added new UTs and tests in `cte-recursion.sql`.
1 parent 0fe4dd7 commit 500f55f

File tree

58 files changed

+6808
-802
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+6808
-802
lines changed

common/utils/src/main/resources/error/error-classes.json

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1708,6 +1708,33 @@
17081708
],
17091709
"sqlState" : "42602"
17101710
},
1711+
"INVALID_RECURSIVE_CTE" : {
1712+
"message" : [
1713+
"Invalid recursive definition found. Recursive queries must contain an UNION or an UNION ALL statement with 2 children. The first child needs to be the anchor term without any recursive references."
1714+
]
1715+
},
1716+
"INVALID_RECURSIVE_REFERENCE" : {
1717+
"message" : [
1718+
"Invalid recursive reference found."
1719+
],
1720+
"subClass" : {
1721+
"DATA_TYPE" : {
1722+
"message" : [
1723+
"The data type of recursive references cannot change during resolution. Originally it was <fromDataType> but after resolution is <toDataType>."
1724+
]
1725+
},
1726+
"NUMBER" : {
1727+
"message" : [
1728+
"Recursive references cannot be used multiple times."
1729+
]
1730+
},
1731+
"PLACE" : {
1732+
"message" : [
1733+
"Recursive references cannot be used on the right side of left outer/semi/anti joins, on the left side of right outer joins, in full outer joins and in aggregates."
1734+
]
1735+
}
1736+
}
1737+
},
17111738
"INVALID_SCHEMA" : {
17121739
"message" : [
17131740
"The input schema <inputSchema> is not a valid schema string."
@@ -2363,6 +2390,16 @@
23632390
"QUALIFY expression '<sqlExpr>' must contain a window function."
23642391
]
23652392
},
2393+
"RECURSIVE_CTE_IN_LEGACY_MODE" : {
2394+
"message" : [
2395+
"Recursive definitions cannot be used in legacy CTE precedence mode (spark.sql.legacy.ctePrecedencePolicy=LEGACY)."
2396+
]
2397+
},
2398+
"RECURSIVE_CTE_WHEN_INLINING_IS_FORCED" : {
2399+
"message" : [
2400+
"Recursive definitions cannot be used when CTE inlining is forced."
2401+
]
2402+
},
23662403
"RECURSIVE_PROTOBUF_SCHEMA" : {
23672404
"message" : [
23682405
"Found recursive reference in Protobuf schema, which can not be processed by Spark by default: <fieldDescriptor>. try setting the option `recursive.fields.max.depth` 0 to 10. Going beyond 10 levels of recursion is not allowed."
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
---
2+
layout: global
3+
title: INVALID_RECURSIVE_REFERENCE error class
4+
displayTitle: INVALID_RECURSIVE_REFERENCE error class
5+
license: |
6+
Licensed to the Apache Software Foundation (ASF) under one or more
7+
contributor license agreements. See the NOTICE file distributed with
8+
this work for additional information regarding copyright ownership.
9+
The ASF licenses this file to You under the Apache License, Version 2.0
10+
(the "License"); you may not use this file except in compliance with
11+
the License. You may obtain a copy of the License at
12+
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
---
21+
22+
SQLSTATE: none assigned
23+
24+
Invalid recursive reference found.
25+
26+
This error class has the following derived error classes:
27+
28+
## DATA_TYPE
29+
30+
The data type of recursive references cannot change during resolution. Originally it was `<fromDataType>` but after resolution is `<toDataType>`.
31+
32+
## NUMBER
33+
34+
Recursive references cannot be used multiple times.
35+
36+
## PLACE
37+
38+
Recursive references cannot be used on the right side of left outer/semi/anti joins, on the left side of right outer joins, in full outer joins and in aggregates.
39+
40+

docs/sql-error-conditions.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,20 @@ For more details see [INVALID_PARTITION_OPERATION](sql-error-conditions-invalid-
10361036

10371037
`<value>` is an invalid property value, please use quotes, e.g. SET `<key>`=`<value>`
10381038

1039+
### INVALID_RECURSIVE_CTE
1040+
1041+
SQLSTATE: none assigned
1042+
1043+
Invalid recursive definition found. Recursive queries must contain an UNION or an UNION ALL statement with 2 children. The first child needs to be the anchor term without any recursive references.
1044+
1045+
### [INVALID_RECURSIVE_REFERENCE](sql-error-conditions-invalid-recursive-reference-error-class.html)
1046+
1047+
SQLSTATE: none assigned
1048+
1049+
Invalid recursive reference found.
1050+
1051+
For more details see [INVALID_RECURSIVE_REFERENCE](sql-error-conditions-invalid-recursive-reference-error-class.html)
1052+
10391053
### [INVALID_SCHEMA](sql-error-conditions-invalid-schema-error-class.html)
10401054

10411055
[SQLSTATE: 42K07](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
@@ -1515,6 +1529,18 @@ SQLSTATE: none assigned
15151529

15161530
QUALIFY expression '`<sqlExpr>`' must contain a window function.
15171531

1532+
### RECURSIVE_CTE_IN_LEGACY_MODE
1533+
1534+
SQLSTATE: none assigned
1535+
1536+
Recursive definitions cannot be used in legacy CTE precedence mode (spark.sql.legacy.ctePrecedencePolicy=LEGACY).
1537+
1538+
### RECURSIVE_CTE_WHEN_INLINING_IS_FORCED
1539+
1540+
SQLSTATE: none assigned
1541+
1542+
Recursive definitions cannot be used when CTE inlining is forced.
1543+
15181544
### RECURSIVE_PROTOBUF_SCHEMA
15191545

15201546
SQLSTATE: none assigned

docs/sql-ref-ansi-compliance.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,7 @@ Below is a list of all the keywords in Spark SQL.
574574
|RECORDREADER|non-reserved|non-reserved|non-reserved|
575575
|RECORDWRITER|non-reserved|non-reserved|non-reserved|
576576
|RECOVER|non-reserved|non-reserved|non-reserved|
577+
|RECURSIVE|reserved|non-reserved|reserved|
577578
|REDUCE|non-reserved|non-reserved|non-reserved|
578579
|REFERENCES|reserved|non-reserved|reserved|
579580
|REFRESH|non-reserved|non-reserved|non-reserved|

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ REAL: 'REAL';
312312
RECORDREADER: 'RECORDREADER';
313313
RECORDWRITER: 'RECORDWRITER';
314314
RECOVER: 'RECOVER';
315+
RECURSIVE: 'RECURSIVE';
315316
REDUCE: 'REDUCE';
316317
REFERENCES: 'REFERENCES';
317318
REFRESH: 'REFRESH';

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ describeColName
364364
;
365365

366366
ctes
367-
: WITH namedQuery (COMMA namedQuery)*
367+
: WITH RECURSIVE? namedQuery (COMMA namedQuery)*
368368
;
369369

370370
namedQuery
@@ -1792,6 +1792,7 @@ nonReserved
17921792
| RECORDREADER
17931793
| RECORDWRITER
17941794
| RECOVER
1795+
| RECURSIVE
17951796
| REDUCE
17961797
| REFERENCES
17971798
| REFRESH

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
347347
Batch("Cleanup", fixedPoint,
348348
CleanupAliases),
349349
Batch("HandleSpecialCommand", Once,
350-
HandleSpecialCommand)
350+
HandleSpecialCommand),
351+
Batch("Insert Loops", Once,
352+
InsertLoops)
351353
)
352354

353355
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala

Lines changed: 117 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
123123
startOfQuery: Boolean = true): Unit = {
124124
val resolver = conf.resolver
125125
plan match {
126-
case UnresolvedWith(child, relations) =>
126+
case UnresolvedWith(child, relations, _) =>
127127
val newNames = ArrayBuffer.empty[String]
128128
newNames ++= outerCTERelationNames
129129
relations.foreach {
@@ -149,10 +149,15 @@ object CTESubstitution extends Rule[LogicalPlan] {
149149
plan: LogicalPlan,
150150
cteDefs: ArrayBuffer[CTERelationDef]): LogicalPlan = {
151151
plan.resolveOperatorsUp {
152-
case UnresolvedWith(child, relations) =>
153-
val resolvedCTERelations =
154-
resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs)
155-
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
152+
case w @ UnresolvedWith(child, relations, allowRecursion) =>
153+
if (allowRecursion) {
154+
w.failAnalysis(
155+
errorClass = "RECURSIVE_CTE_IN_LEGACY_MODE",
156+
messageParameters = Map.empty)
157+
}
158+
val resolvedCTERelations = resolveCTERelations(relations, isLegacy = true,
159+
forceInline = false, Seq.empty, cteDefs, allowRecursion)
160+
substituteCTE(child, alwaysInline = true, resolvedCTERelations, None)._1
156161
}
157162
}
158163

@@ -202,14 +207,20 @@ object CTESubstitution extends Rule[LogicalPlan] {
202207
var firstSubstituted: Option[LogicalPlan] = None
203208
val newPlan = plan.resolveOperatorsDownWithPruning(
204209
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
205-
case UnresolvedWith(child: LogicalPlan, relations) =>
210+
case w @ UnresolvedWith(child, relations, allowRecursion) =>
211+
if (allowRecursion && forceInline) {
212+
w.failAnalysis(
213+
errorClass = "RECURSIVE_CTE_WHEN_INLINING_IS_FORCED",
214+
messageParameters = Map.empty)
215+
}
206216
val resolvedCTERelations =
207-
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++
208-
outerCTEDefs
217+
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs,
218+
allowRecursion) ++ outerCTEDefs
209219
val substituted = substituteCTE(
210220
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1,
211221
forceInline,
212-
resolvedCTERelations)
222+
resolvedCTERelations,
223+
None)._1
213224
if (firstSubstituted.isEmpty) {
214225
firstSubstituted = Some(substituted)
215226
}
@@ -228,7 +239,8 @@ object CTESubstitution extends Rule[LogicalPlan] {
228239
isLegacy: Boolean,
229240
forceInline: Boolean,
230241
outerCTEDefs: Seq[(String, CTERelationDef)],
231-
cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
242+
cteDefs: ArrayBuffer[CTERelationDef],
243+
allowRecursion: Boolean): Seq[(String, CTERelationDef)] = {
232244
val alwaysInline = isLegacy || forceInline
233245
var resolvedCTERelations = if (alwaysInline) {
234246
Seq.empty
@@ -247,53 +259,127 @@ object CTESubstitution extends Rule[LogicalPlan] {
247259
// NOTE: we must call `traverseAndSubstituteCTE` before `substituteCTE`, as the relations
248260
// in the inner CTE have higher priority over the relations in the outer CTE when resolving
249261
// inner CTE relations. For example:
250-
// WITH t1 AS (SELECT 1)
251-
// t2 AS (
252-
// WITH t1 AS (SELECT 2)
253-
// WITH t3 AS (SELECT * FROM t1)
254-
// )
262+
// WITH
263+
// t1 AS (SELECT 1),
264+
// t2 AS (
265+
// WITH
266+
// t1 AS (SELECT 2),
267+
// t3 AS (SELECT * FROM t1)
268+
// SELECT * FROM t1
269+
// )
270+
// SELECT * FROM t2
255271
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
256-
traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1
272+
//
273+
// When recursion allowed:
274+
// - don't add current definition to outer definitions of `traverseAndSubstituteCTE()` to
275+
// prevent recursion inside inner CTEs.
276+
// E.g. the following query will not resolve `t1` within `t2`:
277+
// WITH RECURSIVE
278+
// t1 AS (
279+
// SELECT 1 AS level
280+
// UNION (
281+
// WITH t2 AS (SELECT level + 1 FROM t1 WHERE level < 10)
282+
// SELECT * FROM t2
283+
// )
284+
// )
285+
// SELECT * FROM t1
286+
// - remove definitions that conflict with current relation `name` from outer definitions of
287+
// `traverseAndSubstituteCTE()` to prevent weird resolutions.
288+
// E.g. we don't want to resolve `t1` within `t3` to `SELECT 1`:
289+
// WITH
290+
// t1 AS (SELECT 1),
291+
// t2 AS (
292+
// WITH RECURSIVE
293+
// t1 AS (
294+
// SELECT 1 AS level
295+
// UNION (
296+
// WITH t3 AS (SELECT level + 1 FROM t1 WHERE level < 10)
297+
// SELECT * FROM t3
298+
// )
299+
// )
300+
// SELECT * FROM t1
301+
// )
302+
// SELECT * FROM t2
303+
val nonConflictingCTERelations = if (allowRecursion) {
304+
resolvedCTERelations.filterNot {
305+
case (cteName, cteDef) => cteDef.conf.resolver(cteName, name)
306+
}
307+
} else {
308+
resolvedCTERelations
309+
}
310+
traverseAndSubstituteCTE(relation, forceInline, nonConflictingCTERelations, cteDefs)._1
311+
}
312+
313+
// If recursion is allowed then it has higher priority than outer or previous relations so
314+
// construct a not yet substituted but recursive `CTERelationDef`, that we will prepend to
315+
// `resolvedCTERelations`.
316+
val recursiveCTERelation = if (allowRecursion) {
317+
Some(name -> CTERelationDef(relation, recursive = true))
318+
} else {
319+
None
257320
}
258-
// CTE definition can reference a previous one
259-
val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations)
260-
val cteRelation = CTERelationDef(substituted)
321+
322+
// CTE definition can reference a previous one or itself if recursion allowed.
323+
val (substituted, recursionFound) = substituteCTE(innerCTEResolved, alwaysInline,
324+
resolvedCTERelations, recursiveCTERelation)
325+
val cteRelation = recursiveCTERelation
326+
.map(_._2.copy(child = substituted, recursive = recursionFound))
327+
.getOrElse(CTERelationDef(substituted))
261328
if (!alwaysInline) {
262329
cteDefs += cteRelation
263330
}
331+
332+
// From this point any reference to the definition is non-recursive.
333+
val nonRecursiveCTERelation = if (cteRelation.recursive) {
334+
cteRelation.copy(recursive = false)
335+
} else {
336+
cteRelation
337+
}
338+
264339
// Prepending new CTEs makes sure that those have higher priority over outer ones.
265-
resolvedCTERelations +:= (name -> cteRelation)
340+
resolvedCTERelations +:= (name -> nonRecursiveCTERelation)
266341
}
267342
resolvedCTERelations
268343
}
269344

270345
private def substituteCTE(
271346
plan: LogicalPlan,
272347
alwaysInline: Boolean,
273-
cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = {
274-
plan.resolveOperatorsUpWithPruning(
348+
cteRelations: Seq[(String, CTERelationDef)],
349+
recursiveCTERelation: Option[(String, CTERelationDef)]): (LogicalPlan, Boolean) = {
350+
var recursionFound = false
351+
val substituted = plan.resolveOperatorsUpWithPruning(
275352
_.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, PLAN_EXPRESSION)) {
276353
case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _)
277354
if cteRelations.exists(r => plan.conf.resolver(r._1, table)) =>
278355
throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(table))
279356

280357
case u @ UnresolvedRelation(Seq(table), _, _) =>
281-
cteRelations.find(r => plan.conf.resolver(r._1, table)).map { case (_, d) =>
282-
if (alwaysInline) {
283-
d.child
284-
} else {
285-
// Add a `SubqueryAlias` for hint-resolving rules to match relation names.
286-
SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output))
287-
}
288-
}.getOrElse(u)
358+
(recursiveCTERelation ++ cteRelations)
359+
.find(r => plan.conf.resolver(r._1, table))
360+
.map { case (_, d) =>
361+
if (alwaysInline) {
362+
d.child
363+
} else {
364+
if (d.recursive) {
365+
recursionFound = true
366+
}
367+
// Add a `SubqueryAlias` for hint-resolving rules to match relation names.
368+
SubqueryAlias(table,
369+
CTERelationRef(d.id, d.resolved, d.output, recursive = d.recursive))
370+
}
371+
}.getOrElse(u)
289372

290373
case other =>
291374
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
292375
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
293376
case e: SubqueryExpression =>
294-
e.withNewPlan(apply(substituteCTE(e.plan, alwaysInline, cteRelations)))
377+
e.withNewPlan(
378+
apply(substituteCTE(e.plan, alwaysInline, cteRelations, None)._1))
295379
}
296380
}
381+
382+
(substituted, recursionFound)
297383
}
298384

299385
/**

0 commit comments

Comments
 (0)