Skip to content

Commit a44b00d

Browse files
jzhugecloud-fan
authored andcommitted
[SPARK-27813][SQL] DataSourceV2: Add DropTable logical operation
## What changes were proposed in this pull request? Support DROP TABLE from V2 catalogs. Move DROP TABLE into catalyst. Move parsing tests for DROP TABLE/VIEW to PlanResolutionSuite to validate existing behavior. Add new tests fo catalyst parser suite. Separate DROP VIEW into different code path from DROP TABLE. Move DROP VIEW into catalyst as a new operator. Add a meaningful exception to indicate view is not currently supported in v2 catalog. ## How was this patch tested? New unit tests. Existing unit tests in catalyst and sql core. Closes #24686 from jzhuge/SPARK-27813-pr. Authored-by: John Zhuge <jzhuge@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent bd87323 commit a44b00d

File tree

14 files changed

+294
-81
lines changed

14 files changed

+294
-81
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ statement
139139
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
140140
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
141141
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
142-
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
143-
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
142+
| DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable
143+
| DROP VIEW (IF EXISTS)? multipartIdentifier #dropView
144144
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
145145
VIEW (IF NOT EXISTS)? tableIdentifier
146146
identifierCommentList?

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
import java.util.Arrays;
2424
import java.util.Objects;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.Stream;
2527

2628
/**
2729
* An {@link Identifier} implementation.
@@ -49,6 +51,21 @@ public String name() {
4951
return name;
5052
}
5153

54+
private String escapeQuote(String part) {
55+
if (part.contains("`")) {
56+
return part.replace("`", "``");
57+
} else {
58+
return part;
59+
}
60+
}
61+
62+
@Override
63+
public String toString() {
64+
return Stream.concat(Stream.of(namespace), Stream.of(name))
65+
.map(part -> '`' + escapeQuote(part) + '`')
66+
.collect(Collectors.joining("."));
67+
}
68+
5269
@Override
5370
public boolean equals(Object o) {
5471
if (this == o) {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
3838
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
3939
import org.apache.spark.sql.catalyst.plans._
4040
import org.apache.spark.sql.catalyst.plans.logical._
41-
import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement}
41+
import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement}
4242
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
4343
import org.apache.spark.sql.internal.SQLConf
4444
import org.apache.spark.sql.types._
@@ -2195,4 +2195,22 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
21952195
}
21962196
}
21972197

2198+
/**
2199+
* Create a [[DropTableStatement]] command.
2200+
*/
2201+
override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
2202+
DropTableStatement(
2203+
visitMultipartIdentifier(ctx.multipartIdentifier()),
2204+
ctx.EXISTS != null,
2205+
ctx.PURGE != null)
2206+
}
2207+
2208+
/**
2209+
* Create a [[DropViewStatement]] command.
2210+
*/
2211+
override def visitDropView(ctx: DropViewContext): AnyRef = withOrigin(ctx) {
2212+
DropViewStatement(
2213+
visitMultipartIdentifier(ctx.multipartIdentifier()),
2214+
ctx.EXISTS != null)
2215+
}
21982216
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,14 @@ object OverwritePartitionsDynamic {
499499
}
500500
}
501501

502+
/**
503+
* Drop a table.
504+
*/
505+
case class DropTable(
506+
catalog: TableCatalog,
507+
ident: Identifier,
508+
ifExists: Boolean) extends Command
509+
502510

503511
/**
504512
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical.sql
19+
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
23+
/**
24+
* A DROP TABLE statement, as parsed from SQL.
25+
*/
26+
case class DropTableStatement(
27+
tableName: Seq[String],
28+
ifExists: Boolean,
29+
purge: Boolean) extends ParsedStatement {
30+
31+
override def output: Seq[Attribute] = Seq.empty
32+
33+
override def children: Seq[LogicalPlan] = Seq.empty
34+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical.sql
19+
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
23+
/**
24+
* A DROP VIEW statement, as parsed from SQL.
25+
*/
26+
case class DropViewStatement(
27+
viewName: Seq[String],
28+
ifExists: Boolean) extends ParsedStatement {
29+
30+
override def output: Seq[Attribute] = Seq.empty
31+
32+
override def children: Seq[LogicalPlan] = Seq.empty
33+
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.parser
2020
import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
2121
import org.apache.spark.sql.catalyst.analysis.AnalysisTest
2222
import org.apache.spark.sql.catalyst.catalog.BucketSpec
23-
import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement}
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement}
2425
import org.apache.spark.sql.types.{IntegerType, StringType, StructType, TimestampType}
2526
import org.apache.spark.unsafe.types.UTF8String
2627

@@ -34,6 +35,10 @@ class DDLParserSuite extends AnalysisTest {
3435
}
3536
}
3637

38+
private def parseCompare(sql: String, expected: LogicalPlan): Unit = {
39+
comparePlans(parsePlan(sql), expected, checkAnalysis = false)
40+
}
41+
3742
test("create table using - schema") {
3843
val sql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
3944

@@ -362,4 +367,31 @@ class DDLParserSuite extends AnalysisTest {
362367
}
363368
}
364369
}
370+
371+
test("drop table") {
372+
parseCompare("DROP TABLE testcat.ns1.ns2.tbl",
373+
DropTableStatement(Seq("testcat", "ns1", "ns2", "tbl"), ifExists = false, purge = false))
374+
parseCompare(s"DROP TABLE db.tab",
375+
DropTableStatement(Seq("db", "tab"), ifExists = false, purge = false))
376+
parseCompare(s"DROP TABLE IF EXISTS db.tab",
377+
DropTableStatement(Seq("db", "tab"), ifExists = true, purge = false))
378+
parseCompare(s"DROP TABLE tab",
379+
DropTableStatement(Seq("tab"), ifExists = false, purge = false))
380+
parseCompare(s"DROP TABLE IF EXISTS tab",
381+
DropTableStatement(Seq("tab"), ifExists = true, purge = false))
382+
parseCompare(s"DROP TABLE tab PURGE",
383+
DropTableStatement(Seq("tab"), ifExists = false, purge = true))
384+
parseCompare(s"DROP TABLE IF EXISTS tab PURGE",
385+
DropTableStatement(Seq("tab"), ifExists = true, purge = true))
386+
}
387+
388+
test("drop view") {
389+
parseCompare(s"DROP VIEW testcat.db.view",
390+
DropViewStatement(Seq("testcat", "db", "view"), ifExists = false))
391+
parseCompare(s"DROP VIEW db.view", DropViewStatement(Seq("db", "view"), ifExists = false))
392+
parseCompare(s"DROP VIEW IF EXISTS db.view",
393+
DropViewStatement(Seq("db", "view"), ifExists = true))
394+
parseCompare(s"DROP VIEW view", DropViewStatement(Seq("view"), ifExists = false))
395+
parseCompare(s"DROP VIEW IF EXISTS view", DropViewStatement(Seq("view"), ifExists = true))
396+
}
365397
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -645,17 +645,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
645645
ctx.TEMPORARY != null)
646646
}
647647

648-
/**
649-
* Create a [[DropTableCommand]] command.
650-
*/
651-
override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
652-
DropTableCommand(
653-
visitTableIdentifier(ctx.tableIdentifier),
654-
ctx.EXISTS != null,
655-
ctx.VIEW != null,
656-
ctx.PURGE != null)
657-
}
658-
659648
/**
660649
* Create a [[AlterTableRenameCommand]] command.
661650
*

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform
2727
import org.apache.spark.sql.catalyst.TableIdentifier
2828
import org.apache.spark.sql.catalyst.analysis.CastSupport
2929
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils}
30-
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, LogicalPlan}
31-
import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement}
30+
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan}
31+
import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement}
3232
import org.apache.spark.sql.catalyst.rules.Rule
33+
import org.apache.spark.sql.execution.command.DropTableCommand
3334
import org.apache.spark.sql.internal.SQLConf
3435
import org.apache.spark.sql.sources.v2.TableProvider
3536
import org.apache.spark.sql.types.StructType
@@ -83,6 +84,20 @@ case class DataSourceResolution(
8384
s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
8485
.asTableCatalog
8586
convertCTAS(catalog, identifier, create)
87+
88+
case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) =>
89+
DropTable(catalog.asTableCatalog, ident, ifExists)
90+
91+
case DropTableStatement(AsTableIdentifier(tableName), ifExists, purge) =>
92+
DropTableCommand(tableName, ifExists, isView = false, purge)
93+
94+
case DropViewStatement(CatalogObjectIdentifier(Some(catalog), ident), _) =>
95+
throw new AnalysisException(
96+
s"Can not specify catalog `${catalog.name}` for view $ident " +
97+
s"because view support in catalog has not been implemented yet")
98+
99+
case DropViewStatement(AsTableIdentifier(tableName), ifExists) =>
100+
DropTableCommand(tableName, ifExists, isView = true, purge = false)
86101
}
87102

88103
object V1WriteProvider {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.mutable
2323
import org.apache.spark.sql.{AnalysisException, Strategy}
2424
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
2525
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
26-
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
26+
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
2727
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
2828
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
2929
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
@@ -199,6 +199,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
199199
Nil
200200
}
201201

202+
case DropTable(catalog, ident, ifExists) =>
203+
DropTableExec(catalog, ident, ifExists) :: Nil
204+
202205
case _ => Nil
203206
}
204207
}

0 commit comments

Comments
 (0)