Skip to content

Commit a997070

Browse files
cloud-fanAlexander Tronchin-James
authored andcommitted
[SPARK-28892][SQL][FOLLOWUP] add resolved logical plan for UPDATE TABLE
### What changes were proposed in this pull request? Add back the resolved logical plan for UPDATE TABLE. It was in apache#25626 before but was removed later. ### Why are the changes needed? In apache#25626 , we decided to not add the update API in DS v2, but we still want to implement UPDATE for builtin source like JDBC. We should at least add the resolved logical plan. ### Does this PR introduce any user-facing change? no, UPDATE is still not supported yet. ### How was this patch tested? new tests. Closes apache#26025 from cloud-fan/update. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Xiao Li <gatorsmile@gmail.com>
1 parent 9e18859 commit a997070

File tree

10 files changed

+154
-59
lines changed

10 files changed

+154
-59
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1696,8 +1696,8 @@ class Analyzer(
16961696
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
16971697
case q: UnaryNode if q.childrenResolved =>
16981698
resolveSubQueries(q, q.children)
1699-
case d: DeleteFromTable if d.childrenResolved =>
1700-
resolveSubQueries(d, d.children)
1699+
case s: SupportsSubquery if s.childrenResolved =>
1700+
resolveSubQueries(s, s.children)
17011701
}
17021702
}
17031703

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -593,19 +593,19 @@ trait CheckAnalysis extends PredicateHelper {
593593
// Only certain operators are allowed to host subquery expression containing
594594
// outer references.
595595
plan match {
596-
case _: Filter | _: Aggregate | _: Project | _: DeleteFromTable => // Ok
596+
case _: Filter | _: Aggregate | _: Project | _: SupportsSubquery => // Ok
597597
case other => failAnalysis(
598598
"Correlated scalar sub-queries can only be used in a " +
599-
s"Filter/Aggregate/Project: $plan")
599+
s"Filter/Aggregate/Project and a few commands: $plan")
600600
}
601601
}
602602

603603
case inSubqueryOrExistsSubquery =>
604604
plan match {
605-
case _: Filter | _: DeleteFromTable => // Ok
605+
case _: Filter | _: SupportsSubquery => // Ok
606606
case _ =>
607607
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
608-
s" Filter/DeleteFromTable: $plan")
608+
s" Filter and a few commands: $plan")
609609
}
610610
}
611611

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,12 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
9696
val aliased = tableAlias.map(SubqueryAlias(_, r)).getOrElse(r)
9797
DeleteFromTable(aliased, condition)
9898

99-
case update: UpdateTableStatement =>
100-
throw new AnalysisException(s"UPDATE TABLE is not supported temporarily.")
99+
case u @ UpdateTableStatement(
100+
nameParts @ CatalogAndIdentifierParts(catalog, tableName), _, _, _, _) =>
101+
val r = UnresolvedV2Relation(nameParts, catalog.asTableCatalog, tableName.asIdentifier)
102+
val aliased = u.tableAlias.map(SubqueryAlias(_, r)).getOrElse(r)
103+
val columns = u.columns.map(UnresolvedAttribute(_))
104+
UpdateTable(aliased, columns, u.values, u.condition)
101105

102106
case DescribeTableStatement(
103107
nameParts @ NonSessionCatalog(catalog, tableName), partitionSpec, isExtended) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,16 @@ object IntegerLiteral {
233233
}
234234
}
235235

236+
/**
237+
* Extractor for retrieving String literals.
238+
*/
239+
object StringLiteral {
240+
def unapply(a: Any): Option[String] = a match {
241+
case Literal(s: UTF8String, StringType) => Some(s.toString)
242+
case _ => None
243+
}
244+
}
245+
236246
/**
237247
* Extractor for and other utility methods for decimal literals.
238248
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -599,10 +599,17 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm
599599
}
600600

601601
case class DeleteFromTable(
602-
child: LogicalPlan,
603-
condition: Option[Expression]) extends Command {
602+
table: LogicalPlan,
603+
condition: Option[Expression]) extends Command with SupportsSubquery {
604+
override def children: Seq[LogicalPlan] = table :: Nil
605+
}
604606

605-
override def children: Seq[LogicalPlan] = child :: Nil
607+
case class UpdateTable(
608+
table: LogicalPlan,
609+
columns: Seq[Expression],
610+
values: Seq[Expression],
611+
condition: Option[Expression]) extends Command with SupportsSubquery {
612+
override def children: Seq[LogicalPlan] = table :: Nil
606613
}
607614

608615
/**
@@ -1241,6 +1248,12 @@ case class Deduplicate(
12411248
override def output: Seq[Attribute] = child.output
12421249
}
12431250

1251+
/**
1252+
* A trait to represent the commands that support subqueries.
1253+
* This is used to whitelist such commands in the subquery-related checks.
1254+
*/
1255+
trait SupportsSubquery extends LogicalPlan
1256+
12441257
/** A trait used for logical plan nodes that create or replace V2 table definitions. */
12451258
trait V2CreateTablePlan extends LogicalPlan {
12461259
def tableName: Identifier

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/UpdateTableStatement.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,6 @@ import org.apache.spark.sql.catalyst.expressions.Expression
2222
case class UpdateTableStatement(
2323
tableName: Seq[String],
2424
tableAlias: Option[String],
25-
attrs: Seq[Seq[String]],
25+
columns: Seq[Seq[String]],
2626
values: Seq[Expression],
2727
condition: Option[Expression]) extends ParsedStatement

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ class AnalysisErrorSuite extends AnalysisTest {
532532
Seq(a, Alias(InSubquery(Seq(a), ListQuery(LocalRelation(b))), "c")()),
533533
LocalRelation(a))
534534
assertAnalysisError(plan, "Predicate sub-queries can only be used" +
535-
" in Filter/DeleteFromTable" :: Nil)
535+
" in Filter" :: Nil)
536536
}
537537

538538
test("PredicateSubQuery is used is a nested condition") {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
745745
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
746746
case r: LogicalRDD =>
747747
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
748+
case _: UpdateTable =>
749+
throw new UnsupportedOperationException(s"UPDATE TABLE is not supported temporarily.")
748750
case _ => Nil
749751
}
750752
}

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -938,19 +938,19 @@ class DataSourceV2SQLSuite
938938
val errorMsg = "Found duplicate column(s) in the table definition of `t`"
939939
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
940940
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
941-
testCreateAnalysisError(
941+
assertAnalysisError(
942942
s"CREATE TABLE t ($c0 INT, $c1 INT) USING $v2Source",
943943
errorMsg
944944
)
945-
testCreateAnalysisError(
945+
assertAnalysisError(
946946
s"CREATE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source",
947947
errorMsg
948948
)
949-
testCreateAnalysisError(
949+
assertAnalysisError(
950950
s"CREATE OR REPLACE TABLE t ($c0 INT, $c1 INT) USING $v2Source",
951951
errorMsg
952952
)
953-
testCreateAnalysisError(
953+
assertAnalysisError(
954954
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source",
955955
errorMsg
956956
)
@@ -962,19 +962,19 @@ class DataSourceV2SQLSuite
962962
val errorMsg = "Found duplicate column(s) in the table definition of `t`"
963963
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
964964
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
965-
testCreateAnalysisError(
965+
assertAnalysisError(
966966
s"CREATE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
967967
errorMsg
968968
)
969-
testCreateAnalysisError(
969+
assertAnalysisError(
970970
s"CREATE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
971971
errorMsg
972972
)
973-
testCreateAnalysisError(
973+
assertAnalysisError(
974974
s"CREATE OR REPLACE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
975975
errorMsg
976976
)
977-
testCreateAnalysisError(
977+
assertAnalysisError(
978978
s"CREATE OR REPLACE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
979979
errorMsg
980980
)
@@ -984,20 +984,20 @@ class DataSourceV2SQLSuite
984984

985985
test("tableCreation: bucket column names not in table definition") {
986986
val errorMsg = "Couldn't find column c in"
987-
testCreateAnalysisError(
987+
assertAnalysisError(
988988
s"CREATE TABLE tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS",
989989
errorMsg
990990
)
991-
testCreateAnalysisError(
991+
assertAnalysisError(
992992
s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS",
993993
errorMsg
994994
)
995-
testCreateAnalysisError(
995+
assertAnalysisError(
996996
s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source " +
997997
"CLUSTERED BY (c) INTO 4 BUCKETS",
998998
errorMsg
999999
)
1000-
testCreateAnalysisError(
1000+
assertAnalysisError(
10011001
s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source " +
10021002
"CLUSTERED BY (c) INTO 4 BUCKETS",
10031003
errorMsg
@@ -1008,19 +1008,19 @@ class DataSourceV2SQLSuite
10081008
val errorMsg = "Found duplicate column(s) in the partitioning"
10091009
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
10101010
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
1011-
testCreateAnalysisError(
1011+
assertAnalysisError(
10121012
s"CREATE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
10131013
errorMsg
10141014
)
1015-
testCreateAnalysisError(
1015+
assertAnalysisError(
10161016
s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
10171017
errorMsg
10181018
)
1019-
testCreateAnalysisError(
1019+
assertAnalysisError(
10201020
s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
10211021
errorMsg
10221022
)
1023-
testCreateAnalysisError(
1023+
assertAnalysisError(
10241024
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
10251025
errorMsg
10261026
)
@@ -1032,22 +1032,22 @@ class DataSourceV2SQLSuite
10321032
val errorMsg = "Found duplicate column(s) in the bucket definition"
10331033
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
10341034
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
1035-
testCreateAnalysisError(
1035+
assertAnalysisError(
10361036
s"CREATE TABLE t ($c0 INT) USING $v2Source " +
10371037
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
10381038
errorMsg
10391039
)
1040-
testCreateAnalysisError(
1040+
assertAnalysisError(
10411041
s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source " +
10421042
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
10431043
errorMsg
10441044
)
1045-
testCreateAnalysisError(
1045+
assertAnalysisError(
10461046
s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source " +
10471047
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
10481048
errorMsg
10491049
)
1050-
testCreateAnalysisError(
1050+
assertAnalysisError(
10511051
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source " +
10521052
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
10531053
errorMsg
@@ -1120,7 +1120,7 @@ class DataSourceV2SQLSuite
11201120
}
11211121
}
11221122

1123-
test("Update: basic - update all") {
1123+
test("UPDATE TABLE") {
11241124
val t = "testcat.ns1.ns2.tbl"
11251125
withTable(t) {
11261126
sql(
@@ -1129,23 +1129,29 @@ class DataSourceV2SQLSuite
11291129
|USING foo
11301130
|PARTITIONED BY (id, p)
11311131
""".stripMargin)
1132-
sql(
1133-
s"""
1134-
|INSERT INTO $t
1135-
|VALUES (1L, 'Herry', 26, 1),
1136-
|(2L, 'Jack', 31, 2),
1137-
|(3L, 'Lisa', 28, 3),
1138-
|(4L, 'Frank', 33, 3)
1139-
""".stripMargin)
1132+
1133+
// UPDATE non-existing table
1134+
assertAnalysisError(
1135+
"UPDATE dummy SET name='abc'",
1136+
"Table not found")
1137+
1138+
// UPDATE non-existing column
1139+
assertAnalysisError(
1140+
s"UPDATE $t SET dummy='abc'",
1141+
"cannot resolve")
1142+
assertAnalysisError(
1143+
s"UPDATE $t SET name='abc' WHERE dummy=1",
1144+
"cannot resolve")
1145+
1146+
// UPDATE is not implemented yet.
1147+
val e = intercept[UnsupportedOperationException] {
1148+
sql(s"UPDATE $t SET name='Robert', age=32 WHERE p=1")
1149+
}
1150+
assert(e.getMessage.contains("UPDATE TABLE is not supported temporarily"))
11401151
}
1141-
val errMsg = "UPDATE TABLE is not supported temporarily"
1142-
testCreateAnalysisError(
1143-
s"UPDATE $t SET name='Robert', age=32",
1144-
errMsg
1145-
)
11461152
}
11471153

1148-
private def testCreateAnalysisError(sqlStatement: String, expectedError: String): Unit = {
1154+
private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
11491155
val errMsg = intercept[AnalysisException] {
11501156
sql(sqlStatement)
11511157
}.getMessage

0 commit comments

Comments
 (0)