Skip to content

Commit 3732a18

Browse files
author
xy_xin
committed
Add UPDATE support for DataSource V2
1 parent 76ebf22 commit 3732a18

File tree

8 files changed

+152
-12
lines changed

8 files changed

+152
-12
lines changed

docs/sql-keywords.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ Below is a list of all the keywords in Spark SQL.
280280
<tr><td>UNKNOWN</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
281281
<tr><td>UNLOCK</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
282282
<tr><td>UNSET</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
283+
<tr><td>UPDATE</td><td>non-reserved</td><td>non-reserved</td><td>reserved</td></tr>
283284
<tr><td>USE</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
284285
<tr><td>USER</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
285286
<tr><td>USING</td><td>reserved</td><td>strict-non-reserved</td><td>reserved</td></tr>

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ statement
217217
| SET .*? #setConfiguration
218218
| RESET #resetConfiguration
219219
| DELETE FROM multipartIdentifier tableAlias whereClause? #deleteFromTable
220+
| UPDATE multipartIdentifier tableAlias setClause whereClause? #updateTable
220221
| unsupportedHiveNativeCommands .*? #failNativeCommand
221222
;
222223

@@ -476,6 +477,14 @@ selectClause
476477
: SELECT (hints+=hint)* setQuantifier? namedExpressionSeq
477478
;
478479

480+
setClause
481+
: SET assign (',' assign)*
482+
;
483+
484+
assign
485+
: key=multipartIdentifier EQ value=expression
486+
;
487+
479488
whereClause
480489
: WHERE booleanExpression
481490
;
@@ -1085,6 +1094,7 @@ ansiNonReserved
10851094
| UNCACHE
10861095
| UNLOCK
10871096
| UNSET
1097+
| UPDATE
10881098
| USE
10891099
| VALUES
10901100
| VIEW
@@ -1355,6 +1365,7 @@ nonReserved
13551365
| UNKNOWN
13561366
| UNLOCK
13571367
| UNSET
1368+
| UPDATE
13581369
| USE
13591370
| USER
13601371
| VALUES
@@ -1622,6 +1633,7 @@ UNIQUE: 'UNIQUE';
16221633
UNKNOWN: 'UNKNOWN';
16231634
UNLOCK: 'UNLOCK';
16241635
UNSET: 'UNSET';
1636+
UPDATE: 'UPDATE';
16251637
USE: 'USE';
16261638
USER: 'USER';
16271639
USING: 'USING';

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
3636
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
3737
import org.apache.spark.sql.catalyst.plans._
3838
import org.apache.spark.sql.catalyst.plans.logical._
39-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement}
39+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement}
4040
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
4141
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
4242
import org.apache.spark.sql.internal.SQLConf
@@ -361,6 +361,36 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
361361
DeleteFromStatement(tableId, tableAlias, predicate)
362362
}
363363

364+
override def visitUpdateTable(ctx: UpdateTableContext): LogicalPlan = withOrigin(ctx) {
365+
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
366+
val tableAlias = if (ctx.tableAlias() != null) {
367+
val ident = ctx.tableAlias().strictIdentifier()
368+
// We do not allow columns aliases after table alias.
369+
if (ctx.tableAlias().identifierList() != null) {
370+
throw new ParseException("Columns aliases is not allowed in UPDATE.",
371+
ctx.tableAlias().identifierList())
372+
}
373+
if (ident != null) Some(ident.getText) else None
374+
} else {
375+
None
376+
}
377+
val (attrs, values) = ctx.setClause().assign().asScala.map {
378+
kv => visitMultipartIdentifier(kv.key) -> expression(kv.value)
379+
}.unzip
380+
val predicate = if (ctx.whereClause() != null) {
381+
Some(expression(ctx.whereClause().booleanExpression()))
382+
} else {
383+
None
384+
}
385+
386+
UpdateTableStatement(
387+
tableId,
388+
tableAlias,
389+
attrs,
390+
values,
391+
predicate)
392+
}
393+
364394
/**
365395
* Create a partition specification map.
366396
*/
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical.sql
19+
20+
import org.apache.spark.sql.catalyst.expressions.Expression
21+
22+
case class UpdateTableStatement(
23+
tableName: Seq[String],
24+
tableAlias: Option[String],
25+
attrs: Seq[Seq[String]],
26+
values: Seq[Expression],
27+
condition: Option[Expression]) extends ParsedStatement

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute
2424
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2525
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
2626
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
27-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement}
27+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement}
2828
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
2929
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
3030
import org.apache.spark.unsafe.types.UTF8String
@@ -789,6 +789,48 @@ class DDLParserSuite extends AnalysisTest {
789789
assert(exc.getMessage.contains("Columns aliases is not allowed in DELETE."))
790790
}
791791

792+
test("update table: basic") {
793+
parseCompare(
794+
"""
795+
|UPDATE testcat.ns1.ns2.tbl
796+
|SET t.a='Robert', t.b=32
797+
""".stripMargin,
798+
UpdateTableStatement(
799+
Seq("testcat", "ns1", "ns2", "tbl"),
800+
None,
801+
Seq(Seq("t", "a"), Seq("t", "b")),
802+
Seq(Literal("Robert"), Literal(32)),
803+
None))
804+
}
805+
806+
test("update table: with alias and where clause") {
807+
parseCompare(
808+
"""
809+
|UPDATE testcat.ns1.ns2.tbl AS t
810+
|SET t.a='Robert', t.b=32
811+
|WHERE t.c=2
812+
""".stripMargin,
813+
UpdateTableStatement(
814+
Seq("testcat", "ns1", "ns2", "tbl"),
815+
Some("t"),
816+
Seq(Seq("t", "a"), Seq("t", "b")),
817+
Seq(Literal("Robert"), Literal(32)),
818+
Some(EqualTo(UnresolvedAttribute("t.c"), Literal(2)))))
819+
}
820+
821+
test("update table: columns aliases is not allowed") {
822+
val exc = intercept[ParseException] {
823+
parsePlan(
824+
"""
825+
|UPDATE testcat.ns1.ns2.tbl AS t(a,b,c,d)
826+
|SET b='Robert', c=32
827+
|WHERE d=2
828+
""".stripMargin)
829+
}
830+
831+
assert(exc.getMessage.contains("Columns aliases is not allowed in UPDATE."))
832+
}
833+
792834
test("show tables") {
793835
comparePlans(
794836
parsePlan("SHOW TABLES"),

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2424
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation}
2525
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils}
2626
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowNamespaces, ShowTables, SubqueryAlias}
27-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement}
27+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement}
2828
import org.apache.spark.sql.catalyst.rules.Rule
2929
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog, TableCatalog}
3030
import org.apache.spark.sql.connector.expressions.Transform
@@ -187,6 +187,9 @@ case class DataSourceResolution(
187187
s"No v2 catalog is available for ${namespace.quoted}")
188188
}
189189

190+
case update: UpdateTableStatement =>
191+
throw new AnalysisException(s"Update table is not supported temporarily.")
192+
190193
case ShowTablesStatement(None, pattern) =>
191194
defaultCatalog match {
192195
case Some(catalog) =>

0 commit comments

Comments
 (0)