Skip to content

Commit 18ddaaa

Browse files
committed
add resolved logical plan for UPDATE TABLE
1 parent 275e044 commit 18ddaaa

File tree

11 files changed

+178
-63
lines changed

11 files changed

+178
-63
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1696,8 +1696,8 @@ class Analyzer(
16961696
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
16971697
case q: UnaryNode if q.childrenResolved =>
16981698
resolveSubQueries(q, q.children)
1699-
case d: DeleteFromTable if d.childrenResolved =>
1700-
resolveSubQueries(d, d.children)
1699+
case s: SupportsSubquery if s.childrenResolved =>
1700+
resolveSubQueries(s, s.children)
17011701
}
17021702
}
17031703

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -593,19 +593,19 @@ trait CheckAnalysis extends PredicateHelper {
593593
// Only certain operators are allowed to host subquery expression containing
594594
// outer references.
595595
plan match {
596-
case _: Filter | _: Aggregate | _: Project | _: DeleteFromTable => // Ok
596+
case _: Filter | _: Aggregate | _: Project | _: SupportsSubquery => // Ok
597597
case other => failAnalysis(
598598
"Correlated scalar sub-queries can only be used in a " +
599-
s"Filter/Aggregate/Project: $plan")
599+
s"Filter/Aggregate/Project and a few commands: $plan")
600600
}
601601
}
602602

603603
case inSubqueryOrExistsSubquery =>
604604
plan match {
605-
case _: Filter | _: DeleteFromTable => // Ok
605+
case _: Filter | _: SupportsSubquery => // Ok
606606
case _ =>
607607
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
608-
s" Filter/DeleteFromTable: $plan")
608+
s" Filter and a few commands: $plan")
609609
}
610610
}
611611

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,12 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
9696
val aliased = tableAlias.map(SubqueryAlias(_, r)).getOrElse(r)
9797
DeleteFromTable(aliased, condition)
9898

99-
case update: UpdateTableStatement =>
100-
throw new AnalysisException(s"UPDATE TABLE is not supported temporarily.")
99+
case u @ UpdateTableStatement(
100+
nameParts @ CatalogAndIdentifierParts(catalog, tableName), _, _, _, _) =>
101+
val r = UnresolvedV2Relation(nameParts, catalog.asTableCatalog, tableName.asIdentifier)
102+
val aliased = u.tableAlias.map(SubqueryAlias(_, r)).getOrElse(r)
103+
val columns = u.columns.map(UnresolvedAttribute(_))
104+
UpdateTable(aliased, columns, u.values, u.condition)
101105

102106
case DescribeTableStatement(
103107
nameParts @ NonSessionCatalog(catalog, tableName), partitionSpec, isExtended) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,16 @@ object IntegerLiteral {
233233
}
234234
}
235235

236+
/**
237+
* Extractor for retrieving String literals.
238+
*/
239+
object StringLiteral {
240+
def unapply(a: Any): Option[String] = a match {
241+
case Literal(s: UTF8String, StringType) => Some(s.toString)
242+
case _ => None
243+
}
244+
}
245+
236246
/**
237247
* Extractor for and other utility methods for decimal literals.
238248
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -598,13 +598,6 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm
598598
override val output = DescribeTableSchema.describeTableAttributes()
599599
}
600600

601-
case class DeleteFromTable(
602-
child: LogicalPlan,
603-
condition: Option[Expression]) extends Command {
604-
605-
override def children: Seq[LogicalPlan] = child :: Nil
606-
}
607-
608601
/**
609602
* Drop a table.
610603
*/
@@ -1241,6 +1234,12 @@ case class Deduplicate(
12411234
override def output: Seq[Attribute] = child.output
12421235
}
12431236

1237+
/**
1238+
* A trait to represent the commands that support subqueries.
1239+
* This is used to whitelist such commands in the subquery-related checks.
1240+
*/
1241+
trait SupportsSubquery extends LogicalPlan
1242+
12441243
/** A trait used for logical plan nodes that create or replace V2 table definitions. */
12451244
trait V2CreateTablePlan extends LogicalPlan {
12461245
def tableName: Identifier
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical
19+
20+
import org.apache.spark.sql.catalyst.expressions.Expression
21+
22+
case class DeleteFromTable(
23+
table: LogicalPlan,
24+
condition: Option[Expression]) extends Command with SupportsSubquery {
25+
override def children: Seq[LogicalPlan] = table :: Nil
26+
}
27+
28+
case class UpdateTable(
29+
table: LogicalPlan,
30+
columns: Seq[Expression],
31+
values: Seq[Expression],
32+
condition: Option[Expression]) extends Command with SupportsSubquery {
33+
override def children: Seq[LogicalPlan] = table :: Nil
34+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/UpdateTableStatement.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,6 @@ import org.apache.spark.sql.catalyst.expressions.Expression
2222
case class UpdateTableStatement(
2323
tableName: Seq[String],
2424
tableAlias: Option[String],
25-
attrs: Seq[Seq[String]],
25+
columns: Seq[Seq[String]],
2626
values: Seq[Expression],
2727
condition: Option[Expression]) extends ParsedStatement

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ class AnalysisErrorSuite extends AnalysisTest {
532532
Seq(a, Alias(InSubquery(Seq(a), ListQuery(LocalRelation(b))), "c")()),
533533
LocalRelation(a))
534534
assertAnalysisError(plan, "Predicate sub-queries can only be used" +
535-
" in Filter/DeleteFromTable" :: Nil)
535+
" in Filter" :: Nil)
536536
}
537537

538538
test("PredicateSubQuery is used is a nested condition") {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
745745
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
746746
case r: LogicalRDD =>
747747
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
748+
case _: UpdateTable =>
749+
throw new UnsupportedOperationException(s"UPDATE TABLE is not supported temporarily.")
748750
case _ => Nil
749751
}
750752
}

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -938,19 +938,19 @@ class DataSourceV2SQLSuite
938938
val errorMsg = "Found duplicate column(s) in the table definition of `t`"
939939
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
940940
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
941-
testCreateAnalysisError(
941+
assertAnalysisError(
942942
s"CREATE TABLE t ($c0 INT, $c1 INT) USING $v2Source",
943943
errorMsg
944944
)
945-
testCreateAnalysisError(
945+
assertAnalysisError(
946946
s"CREATE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source",
947947
errorMsg
948948
)
949-
testCreateAnalysisError(
949+
assertAnalysisError(
950950
s"CREATE OR REPLACE TABLE t ($c0 INT, $c1 INT) USING $v2Source",
951951
errorMsg
952952
)
953-
testCreateAnalysisError(
953+
assertAnalysisError(
954954
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source",
955955
errorMsg
956956
)
@@ -962,19 +962,19 @@ class DataSourceV2SQLSuite
962962
val errorMsg = "Found duplicate column(s) in the table definition of `t`"
963963
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
964964
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
965-
testCreateAnalysisError(
965+
assertAnalysisError(
966966
s"CREATE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
967967
errorMsg
968968
)
969-
testCreateAnalysisError(
969+
assertAnalysisError(
970970
s"CREATE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
971971
errorMsg
972972
)
973-
testCreateAnalysisError(
973+
assertAnalysisError(
974974
s"CREATE OR REPLACE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
975975
errorMsg
976976
)
977-
testCreateAnalysisError(
977+
assertAnalysisError(
978978
s"CREATE OR REPLACE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
979979
errorMsg
980980
)
@@ -984,20 +984,20 @@ class DataSourceV2SQLSuite
984984

985985
test("tableCreation: bucket column names not in table definition") {
986986
val errorMsg = "Couldn't find column c in"
987-
testCreateAnalysisError(
987+
assertAnalysisError(
988988
s"CREATE TABLE tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS",
989989
errorMsg
990990
)
991-
testCreateAnalysisError(
991+
assertAnalysisError(
992992
s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS",
993993
errorMsg
994994
)
995-
testCreateAnalysisError(
995+
assertAnalysisError(
996996
s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source " +
997997
"CLUSTERED BY (c) INTO 4 BUCKETS",
998998
errorMsg
999999
)
1000-
testCreateAnalysisError(
1000+
assertAnalysisError(
10011001
s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source " +
10021002
"CLUSTERED BY (c) INTO 4 BUCKETS",
10031003
errorMsg
@@ -1008,19 +1008,19 @@ class DataSourceV2SQLSuite
10081008
val errorMsg = "Found duplicate column(s) in the partitioning"
10091009
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
10101010
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
1011-
testCreateAnalysisError(
1011+
assertAnalysisError(
10121012
s"CREATE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
10131013
errorMsg
10141014
)
1015-
testCreateAnalysisError(
1015+
assertAnalysisError(
10161016
s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
10171017
errorMsg
10181018
)
1019-
testCreateAnalysisError(
1019+
assertAnalysisError(
10201020
s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
10211021
errorMsg
10221022
)
1023-
testCreateAnalysisError(
1023+
assertAnalysisError(
10241024
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
10251025
errorMsg
10261026
)
@@ -1032,22 +1032,22 @@ class DataSourceV2SQLSuite
10321032
val errorMsg = "Found duplicate column(s) in the bucket definition"
10331033
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
10341034
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
1035-
testCreateAnalysisError(
1035+
assertAnalysisError(
10361036
s"CREATE TABLE t ($c0 INT) USING $v2Source " +
10371037
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
10381038
errorMsg
10391039
)
1040-
testCreateAnalysisError(
1040+
assertAnalysisError(
10411041
s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source " +
10421042
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
10431043
errorMsg
10441044
)
1045-
testCreateAnalysisError(
1045+
assertAnalysisError(
10461046
s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source " +
10471047
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
10481048
errorMsg
10491049
)
1050-
testCreateAnalysisError(
1050+
assertAnalysisError(
10511051
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source " +
10521052
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
10531053
errorMsg
@@ -1120,7 +1120,7 @@ class DataSourceV2SQLSuite
11201120
}
11211121
}
11221122

1123-
test("Update: basic - update all") {
1123+
test("UPDATE TABLE") {
11241124
val t = "testcat.ns1.ns2.tbl"
11251125
withTable(t) {
11261126
sql(
@@ -1129,23 +1129,29 @@ class DataSourceV2SQLSuite
11291129
|USING foo
11301130
|PARTITIONED BY (id, p)
11311131
""".stripMargin)
1132-
sql(
1133-
s"""
1134-
|INSERT INTO $t
1135-
|VALUES (1L, 'Herry', 26, 1),
1136-
|(2L, 'Jack', 31, 2),
1137-
|(3L, 'Lisa', 28, 3),
1138-
|(4L, 'Frank', 33, 3)
1139-
""".stripMargin)
1132+
1133+
// UPDATE non-existing table
1134+
assertAnalysisError(
1135+
"UPDATE dummy SET name='abc'",
1136+
"Table not found")
1137+
1138+
// UPDATE non-existing column
1139+
assertAnalysisError(
1140+
s"UPDATE $t SET dummy='abc'",
1141+
"cannot resolve")
1142+
assertAnalysisError(
1143+
s"UPDATE $t SET name='abc' WHERE dummy=1",
1144+
"cannot resolve")
1145+
1146+
// UPDATE is not implemented yet.
1147+
val e = intercept[UnsupportedOperationException] {
1148+
sql(s"UPDATE $t SET name='Robert', age=32 WHERE p=1")
1149+
}
1150+
assert(e.getMessage.contains("UPDATE TABLE is not supported temporarily"))
11401151
}
1141-
val errMsg = "UPDATE TABLE is not supported temporarily"
1142-
testCreateAnalysisError(
1143-
s"UPDATE $t SET name='Robert', age=32",
1144-
errMsg
1145-
)
11461152
}
11471153

1148-
private def testCreateAnalysisError(sqlStatement: String, expectedError: String): Unit = {
1154+
private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
11491155
val errMsg = intercept[AnalysisException] {
11501156
sql(sqlStatement)
11511157
}.getMessage

0 commit comments

Comments
 (0)