-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29513][SQL] REFRESH TABLE should look up catalog/table like v2 commands #26183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
dc52212
fcc31f2
7b884e8
fe577f5
3a1da85
31bfc6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.v2 | ||
|
|
||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} | ||
|
|
||
| case class RefreshTableExec( | ||
| catalog: TableCatalog, | ||
| ident: Identifier) extends V2CommandExec { | ||
| override protected def run(): Seq[InternalRow] = { | ||
| catalog.invalidateTable(ident) | ||
| Seq.empty | ||
| } | ||
|
|
||
| override def output: Seq[Attribute] = Seq.empty | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1112,6 +1112,20 @@ class DataSourceV2SQLSuite | |
| } | ||
| } | ||
|
|
||
| test("REFRESH TABLE: v2 table") { | ||
| val t = "testcat.ns1.ns2.tbl" | ||
| withTable(t) { | ||
| sql(s"CREATE TABLE $t (id bigint, data string) USING foo") | ||
|
|
||
| val testCatalog = catalog("testcat").asTableCatalog.asInstanceOf[InMemoryTableCatalog] | ||
| val identifier = Identifier.of(Array("ns1", "ns2"), "tbl") | ||
|
|
||
| assert(!testCatalog.isTableInvalidated(identifier)) | ||
| sql(s"REFRESH TABLE $t") | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any suggestion for a reasonable test here?
sql(s"CREATE TABLE $t (id int, data string) USING $v2Format")
sql(s"ALTER TABLE $t DROP COLUMN data")
val table = getTableMetadata(t) // <- this calls TableCatalog.loadTable
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a reasonable test is to create a custom v2 implementation which implements
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can implement
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the suggestion! Exactly what I was planning to do. :) |
||
| assert(testCatalog.isTableInvalidated(identifier)) | ||
| } | ||
| } | ||
|
|
||
| test("REPLACE TABLE: v1 table") { | ||
| val e = intercept[AnalysisException] { | ||
| sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") | ||
|
|
@@ -1211,28 +1225,16 @@ class DataSourceV2SQLSuite | |
| val t = "testcat.ns1.ns2.tbl" | ||
| withTable(t) { | ||
| spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") | ||
|
|
||
| val e = intercept[AnalysisException] { | ||
| sql(s"ANALYZE TABLE $t COMPUTE STATISTICS") | ||
| } | ||
| assert(e.message.contains("ANALYZE TABLE is only supported with v1 tables")) | ||
|
|
||
| val e2 = intercept[AnalysisException] { | ||
| sql(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS") | ||
| } | ||
| assert(e2.message.contains("ANALYZE TABLE is only supported with v1 tables")) | ||
| testV1Command("ANALYZE TABLE", s"$t COMPUTE STATISTICS") | ||
| testV1Command("ANALYZE TABLE", s"$t COMPUTE STATISTICS FOR ALL COLUMNS") | ||
| } | ||
| } | ||
|
|
||
| test("MSCK REPAIR TABLE") { | ||
| val t = "testcat.ns1.ns2.tbl" | ||
| withTable(t) { | ||
| spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") | ||
|
|
||
| val e = intercept[AnalysisException] { | ||
| sql(s"MSCK REPAIR TABLE $t") | ||
| } | ||
| assert(e.message.contains("MSCK REPAIR TABLE is only supported with v1 tables")) | ||
| testV1Command("MSCK REPAIR TABLE", t) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1246,15 +1248,8 @@ class DataSourceV2SQLSuite | |
| |PARTITIONED BY (id) | ||
| """.stripMargin) | ||
|
|
||
| val e1 = intercept[AnalysisException] { | ||
| sql(s"TRUNCATE TABLE $t") | ||
| } | ||
| assert(e1.message.contains("TRUNCATE TABLE is only supported with v1 tables")) | ||
|
|
||
| val e2 = intercept[AnalysisException] { | ||
| sql(s"TRUNCATE TABLE $t PARTITION(id='1')") | ||
| } | ||
| assert(e2.message.contains("TRUNCATE TABLE is only supported with v1 tables")) | ||
| testV1Command("TRUNCATE TABLE", t) | ||
| testV1Command("TRUNCATE TABLE", s"$t PARTITION(id='1')") | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1268,16 +1263,16 @@ class DataSourceV2SQLSuite | |
| |PARTITIONED BY (id) | ||
| """.stripMargin) | ||
|
|
||
| val e1 = intercept[AnalysisException] { | ||
| val partition = sql(s"SHOW PARTITIONS $t") | ||
| } | ||
| assert(e1.message.contains("SHOW PARTITIONS is only supported with v1 tables")) | ||
| testV1Command("SHOW PARTITIONS", t) | ||
| testV1Command("SHOW PARTITIONS", s"$t PARTITION(id='1')") | ||
| } | ||
| } | ||
|
|
||
| val e2 = intercept[AnalysisException] { | ||
| val partition2 = sql(s"SHOW PARTITIONS $t PARTITION(id='1')") | ||
| } | ||
| assert(e2.message.contains("SHOW PARTITIONS is only supported with v1 tables")) | ||
| private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { | ||
| val e = intercept[AnalysisException] { | ||
| sql(s"$sqlCommand $sqlParams") | ||
| } | ||
| assert(e.message.contains(s"$sqlCommand is only supported with v1 tables")) | ||
| } | ||
|
|
||
| private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.