Skip to content

[SPARK-16867][SQL] createTable and alterTable in ExternalCatalog should not take db #14476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,21 @@ abstract class ExternalCatalog {
// Tables
// --------------------------------------------------------------------------

def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit

def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit

def renameTable(db: String, oldName: String, newName: String): Unit

/**
* Alter a table whose name that matches the one specified in `tableDefinition`,
* assuming the table exists.
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
* the table exists. Note that, even though we can specify database in `tableDefinition`, it's
* used to identify the table, not to alter the table's database, which is not allowed.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterTable(db: String, tableDefinition: CatalogTable): Unit
def alterTable(tableDefinition: CatalogTable): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document that this does not support moving database?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically a table is referenced by a pair of dbName and tableName. If we have a table db1.tbl1, and then alter table db2.tbl1, the semantic is not moving tbl1 from db1 to db2, but alter a nonexistent table db2.tabl1. For ALTER TABLE, I think there is no such a concept about moving database. what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add doc to explain that it does not support moving a table to another db since a developer want to use it in this way by just looking at this API (tableDefinition has a db field).


def getTable(db: String, table: String): CatalogTable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
// --------------------------------------------------------------------------

override def createTable(
db: String,
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireDbExists(db)
val table = tableDefinition.identifier.table
if (tableExists(db, table)) {
Expand Down Expand Up @@ -266,7 +267,9 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
catalog(db).tables.remove(oldName)
}

override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
override def alterTable(tableDefinition: CatalogTable): Unit = synchronized {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireTableExists(db, tableDefinition.identifier.table)
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class SessionCatalog(
val table = formatTableName(tableDefinition.identifier.table)
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
requireDbExists(db)
externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
}

/**
Expand All @@ -242,7 +242,7 @@ class SessionCatalog(
val newTableDefinition = tableDefinition.copy(identifier = tableIdentifier)
requireDbExists(db)
requireTableExists(tableIdentifier)
externalCatalog.alterTable(db, newTableDefinition)
externalCatalog.alterTable(newTableDefinition)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
val table =
newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL)
catalog.createTable("db2", table, ignoreIfExists = false)
catalog.createTable(table, ignoreIfExists = false)
val actual = catalog.getTable("db2", "external_table1")
assert(actual.tableType === CatalogTableType.EXTERNAL)
}
Expand Down Expand Up @@ -212,7 +212,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("alter table") {
val catalog = newBasicCatalog()
val tbl1 = catalog.getTable("db2", "tbl1")
catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem")))
catalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
val newTbl1 = catalog.getTable("db2", "tbl1")
assert(!tbl1.properties.contains("toh"))
assert(newTbl1.properties.size == tbl1.properties.size + 1)
Expand All @@ -222,10 +222,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("alter table when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db"))
catalog.alterTable(newTable("tbl1", "unknown_db"))
}
intercept[AnalysisException] {
catalog.alterTable("db2", newTable("unknown_table", "db2"))
catalog.alterTable(newTable("unknown_table", "db2"))
}
}

Expand Down Expand Up @@ -266,7 +266,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("basic create and list partitions") {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false)
catalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false)
catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false)
assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2)))
}
Expand Down Expand Up @@ -555,7 +555,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
schema = new StructType().add("a", "int").add("b", "string")
)

catalog.createTable("db1", table, ignoreIfExists = false)
catalog.createTable(table, ignoreIfExists = false)
assert(exists(db.locationUri, "my_table"))

catalog.renameTable("db1", "my_table", "your_table")
Expand All @@ -573,7 +573,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string")
)
catalog.createTable("db1", externalTable, ignoreIfExists = false)
catalog.createTable(externalTable, ignoreIfExists = false)
assert(!exists(db.locationUri, "external_table"))
}

Expand All @@ -591,7 +591,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
.add("b", "string"),
partitionColumnNames = Seq("a", "b")
)
catalog.createTable("db1", table, ignoreIfExists = false)
catalog.createTable(table, ignoreIfExists = false)

catalog.createPartitions("db1", "tbl", Seq(part1, part2), ignoreIfExists = false)
assert(exists(databaseDir, "tbl", "a=1", "b=2"))
Expand Down Expand Up @@ -665,8 +665,8 @@ abstract class CatalogTestUtils {
catalog.createDatabase(newDb("default"), ignoreIfExists = true)
catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
catalog.createTable(newTable("tbl2", "db2"), ignoreIfExists = false)
catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
catalog.createFunction("db2", newFunc("func1", Some("db2")))
catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,6 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
}
}

private def requireDbMatches(db: String, table: CatalogTable): Unit = {
if (table.identifier.database != Some(db)) {
throw new AnalysisException(
s"Provided database '$db' does not match the one specified in the " +
s"table definition (${table.identifier.database.getOrElse("n/a")})")
}
}

private def requireTableExists(db: String, table: String): Unit = {
withClient { getTable(db, table) }
}
Expand Down Expand Up @@ -147,11 +139,11 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
// --------------------------------------------------------------------------

override def createTable(
db: String,
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = withClient {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireDbExists(db)
requireDbMatches(db, tableDefinition)

if (
// If this is an external data source table...
Expand Down Expand Up @@ -211,8 +203,9 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
* Note: As of now, this only supports altering table properties, serde properties,
* and num buckets!
*/
override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
requireDbMatches(db, tableDefinition)
override def alterTable(tableDefinition: CatalogTable): Unit = withClient {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireTableExists(db, tableDefinition.identifier.table)
client.alterTable(tableDefinition)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
DATASOURCE_SCHEMA -> schema.json,
"EXTERNAL" -> "FALSE"))

sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false)
sharedState.externalCatalog.createTable(hiveTable, ignoreIfExists = false)

sessionState.refreshTable(tableName)
val actualSchema = table(tableName).schema
Expand Down