Skip to content

Commit ab92e17

Browse files
imback82viirya
authored andcommitted
[SPARK-29512][SQL] REPAIR TABLE should look up catalog/table like v2 commands
### What changes were proposed in this pull request? Add RepairTableStatement and make REPAIR TABLE go through the same catalog/table resolution framework of v2 commands. ### Why are the changes needed? It's important to make all the commands have the same table resolution behavior, to avoid confusing end-users. e.g. ``` USE my_catalog DESC t // success and describe the table t from my_catalog MSCK REPAIR TABLE t // report table not found as there is no table t in the session catalog ``` ### Does this PR introduce any user-facing change? yes. When running MSCK REPAIR TABLE, Spark fails the command if the current catalog is set to a v2 catalog, or the table name specified a v2 catalog. ### How was this patch tested? New unit tests Closes #26168 from imback82/repair_table. Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: Liang-Chi Hsieh <liangchi@uber.com>
1 parent 2437878 commit ab92e17

File tree

8 files changed

+56
-36
lines changed

8 files changed

+56
-36
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ statement
212212
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
213213
tableIdentifier partitionSpec? #loadData
214214
| TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable
215-
| MSCK REPAIR TABLE tableIdentifier #repairTable
215+
| MSCK REPAIR TABLE multipartIdentifier #repairTable
216216
| op=(ADD | LIST) identifier .*? #manageResource
217217
| SET ROLE .*? #failNativeCommand
218218
| SET .*? #setConfiguration

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2716,4 +2716,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
27162716
tableName, Option(visitIdentifierSeq(ctx.identifierSeq())), allColumns = false)
27172717
}
27182718
}
2719+
2720+
/**
2721+
* Create a [[RepairTableStatement]].
2722+
*
2723+
* For example:
2724+
* {{{
2725+
* MSCK REPAIR TABLE multi_part_name
2726+
* }}}
2727+
*/
2728+
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
2729+
RepairTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()))
2730+
}
27192731
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,3 +311,8 @@ case class AnalyzeColumnStatement(
311311
require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or `allColumns` are " +
312312
"mutually exclusive. Only one of them should be specified.")
313313
}
314+
315+
/**
316+
* A REPAIR TABLE statement, as parsed from SQL
317+
*/
318+
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,12 @@ class DDLParserSuite extends AnalysisTest {
955955
"missing 'COLUMNS' at '<EOF>'")
956956
}
957957

958+
test("MSCK REPAIR table") {
959+
comparePlans(
960+
parsePlan("MSCK REPAIR TABLE a.b.c"),
961+
RepairTableStatement(Seq("a", "b", "c")))
962+
}
963+
958964
private case class TableSpec(
959965
name: Seq[String],
960966
schema: Option[StructType],

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
2424
import org.apache.spark.sql.catalyst.rules.Rule
2525
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
2626
import org.apache.spark.sql.connector.expressions.Transform
27-
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
27+
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
2828
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
2929
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
3030
import org.apache.spark.sql.internal.SQLConf
@@ -266,22 +266,30 @@ class ResolveSessionCatalog(
266266
ShowTablesCommand(None, pattern)
267267

268268
case AnalyzeTableStatement(tableName, partitionSpec, noScan) =>
269-
val CatalogAndIdentifierParts(catalog, parts) = tableName
270-
if (!isSessionCatalog(catalog)) {
271-
throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.")
272-
}
269+
val v1TableName = parseV1Table(tableName, "ANALYZE TABLE")
273270
if (partitionSpec.isEmpty) {
274-
AnalyzeTableCommand(parts.asTableIdentifier, noScan)
271+
AnalyzeTableCommand(v1TableName.asTableIdentifier, noScan)
275272
} else {
276-
AnalyzePartitionCommand(parts.asTableIdentifier, partitionSpec, noScan)
273+
AnalyzePartitionCommand(v1TableName.asTableIdentifier, partitionSpec, noScan)
277274
}
278275

279276
case AnalyzeColumnStatement(tableName, columnNames, allColumns) =>
280-
val CatalogAndIdentifierParts(catalog, parts) = tableName
281-
if (!isSessionCatalog(catalog)) {
282-
throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.")
283-
}
284-
AnalyzeColumnCommand(parts.asTableIdentifier, columnNames, allColumns)
277+
val v1TableName = parseV1Table(tableName, "ANALYZE TABLE")
278+
AnalyzeColumnCommand(v1TableName.asTableIdentifier, columnNames, allColumns)
279+
280+
case RepairTableStatement(tableName) =>
281+
val v1TableName = parseV1Table(tableName, "MSCK REPAIR TABLE")
282+
AlterTableRecoverPartitionsCommand(
283+
v1TableName.asTableIdentifier,
284+
"MSCK REPAIR TABLE")
285+
}
286+
287+
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {
288+
val CatalogAndIdentifierParts(catalog, parts) = tableName
289+
if (!isSessionCatalog(catalog)) {
290+
throw new AnalysisException(s"$sql is only supported with v1 tables.")
291+
}
292+
parts
285293
}
286294

287295
private def buildCatalogTable(

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -360,20 +360,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
360360
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
361361
}
362362

363-
/**
364-
* Create a [[AlterTableRecoverPartitionsCommand]] command.
365-
*
366-
* For example:
367-
* {{{
368-
* MSCK REPAIR TABLE tablename
369-
* }}}
370-
*/
371-
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
372-
AlterTableRecoverPartitionsCommand(
373-
visitTableIdentifier(ctx.tableIdentifier),
374-
"MSCK REPAIR TABLE")
375-
}
376-
377363
/**
378364
* Create a [[CreateDatabaseCommand]] command.
379365
*

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,6 +1198,18 @@ class DataSourceV2SQLSuite
11981198
}
11991199
}
12001200

1201+
test("MSCK REPAIR TABLE") {
1202+
val t = "testcat.ns1.ns2.tbl"
1203+
withTable(t) {
1204+
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
1205+
1206+
val e = intercept[AnalysisException] {
1207+
sql(s"MSCK REPAIR TABLE $t")
1208+
}
1209+
assert(e.message.contains("MSCK REPAIR TABLE is only supported with v1 tables"))
1210+
}
1211+
}
1212+
12011213
private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
12021214
val errMsg = intercept[AnalysisException] {
12031215
sql(sqlStatement)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,15 +1444,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
14441444
intercept(sql2, "Found duplicate clauses: TBLPROPERTIES")
14451445
}
14461446

1447-
test("MSCK REPAIR table") {
1448-
val sql = "MSCK REPAIR TABLE tab1"
1449-
val parsed = parser.parsePlan(sql)
1450-
val expected = AlterTableRecoverPartitionsCommand(
1451-
TableIdentifier("tab1", None),
1452-
"MSCK REPAIR TABLE")
1453-
comparePlans(parsed, expected)
1454-
}
1455-
14561447
test("create table like") {
14571448
val v1 = "CREATE TABLE table1 LIKE table2"
14581449
val (target, source, location, exists) = parser.parsePlan(v1).collect {

0 commit comments

Comments
 (0)