Skip to content

Commit dd1fbae

Browse files
author
Andrew Or
committed
Refactor CatalogTable to use TableIdentifier
This is a standalone commit such that in the future we can split it out into a separate patch if preferrable.
1 parent 39a153c commit dd1fbae

File tree

13 files changed

+54
-62
lines changed

13 files changed

+54
-62
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
2020
import scala.collection.mutable
2121

2222
import org.apache.spark.sql.AnalysisException
23+
import org.apache.spark.sql.catalyst.TableIdentifier
2324

2425

2526
/**
@@ -156,12 +157,13 @@ class InMemoryCatalog extends ExternalCatalog {
156157
tableDefinition: CatalogTable,
157158
ignoreIfExists: Boolean): Unit = synchronized {
158159
requireDbExists(db)
159-
if (existsTable(db, tableDefinition.name)) {
160+
val table = tableDefinition.name.table
161+
if (existsTable(db, table)) {
160162
if (!ignoreIfExists) {
161-
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
163+
throw new AnalysisException(s"Table $table already exists in $db database")
162164
}
163165
} else {
164-
catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition))
166+
catalog(db).tables.put(table, new TableDesc(tableDefinition))
165167
}
166168
}
167169

@@ -182,14 +184,14 @@ class InMemoryCatalog extends ExternalCatalog {
182184
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
183185
requireTableExists(db, oldName)
184186
val oldDesc = catalog(db).tables(oldName)
185-
oldDesc.table = oldDesc.table.copy(name = newName)
187+
oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db)))
186188
catalog(db).tables.put(newName, oldDesc)
187189
catalog(db).tables.remove(oldName)
188190
}
189191

190192
override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
191-
requireTableExists(db, tableDefinition.name)
192-
catalog(db).tables(tableDefinition.name).table = tableDefinition
193+
requireTableExists(db, tableDefinition.name.table)
194+
catalog(db).tables(tableDefinition.name.table).table = tableDefinition
193195
}
194196

195197
override def getTable(db: String, table: String): CatalogTable = synchronized {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
2020
import javax.annotation.Nullable
2121

2222
import org.apache.spark.sql.AnalysisException
23+
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
2324

2425

2526
/**
@@ -212,8 +213,7 @@ case class CatalogTablePartition(
212213
* future once we have a better understanding of how we want to handle skewed columns.
213214
*/
214215
case class CatalogTable(
215-
specifiedDatabase: Option[String],
216-
name: String,
216+
name: TableIdentifier,
217217
tableType: CatalogTableType,
218218
storage: CatalogStorageFormat,
219219
schema: Seq[CatalogColumn],
@@ -227,12 +227,12 @@ case class CatalogTable(
227227
viewText: Option[String] = None) {
228228

229229
/** Return the database this table was specified to belong to, assuming it exists. */
230-
def database: String = specifiedDatabase.getOrElse {
230+
def database: String = name.database.getOrElse {
231231
throw new AnalysisException(s"table $name did not specify database")
232232
}
233233

234234
/** Return the fully qualified name of this table, assuming the database was specified. */
235-
def qualifiedName: String = s"$database.$name"
235+
def qualifiedName: String = name.unquotedString
236236

237237
/** Syntactic sugar to update a field in `storage`. */
238238
def withNewStorage(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfterEach
2121

2222
import org.apache.spark.SparkFunSuite
2323
import org.apache.spark.sql.AnalysisException
24+
import org.apache.spark.sql.catalyst.TableIdentifier
2425

2526

2627
/**
@@ -89,8 +90,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
8990

9091
private def newTable(name: String, db: String): CatalogTable = {
9192
CatalogTable(
92-
specifiedDatabase = Some(db),
93-
name = name,
93+
name = TableIdentifier(name, Some(db)),
9494
tableType = CatalogTableType.EXTERNAL_TABLE,
9595
storage = storageFormat,
9696
schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
@@ -277,7 +277,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
277277
}
278278

279279
test("get table") {
280-
assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
280+
assert(newBasicCatalog().getTable("db2", "tbl1").name.table == "tbl1")
281281
}
282282

283283
test("get table when database/table does not exist") {

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.AnalysisException
2727
import org.apache.spark.sql.catalyst.analysis.NoSuchItemException
2828
import org.apache.spark.sql.catalyst.catalog._
2929
import org.apache.spark.sql.hive.client.HiveClient
30+
import org.apache.spark.sql.catalyst.TableIdentifier
3031

3132

3233
/**
@@ -73,10 +74,10 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
7374
}
7475

7576
private def requireDbMatches(db: String, table: CatalogTable): Unit = {
76-
if (table.specifiedDatabase != Some(db)) {
77+
if (table.name.database != Some(db)) {
7778
throw new AnalysisException(
7879
s"Provided database $db does not much the one specified in the " +
79-
s"table definition (${table.specifiedDatabase.getOrElse("n/a")})")
80+
s"table definition (${table.name.database.getOrElse("n/a")})")
8081
}
8182
}
8283

@@ -160,7 +161,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
160161
}
161162

162163
override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
163-
val newTable = client.getTable(db, oldName).copy(name = newName)
164+
val newTable = client.getTable(db, oldName).copy(name = TableIdentifier(newName, Some(db)))
164165
client.alterTable(oldName, newTable)
165166
}
166167

@@ -173,7 +174,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
173174
*/
174175
override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
175176
requireDbMatches(db, tableDefinition)
176-
requireTableExists(db, tableDefinition.name)
177+
requireTableExists(db, tableDefinition.name.table)
177178
client.alterTable(tableDefinition)
178179
}
179180

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
118118

119119
private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
120120
QualifiedTableName(
121-
t.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
122-
t.name.toLowerCase)
121+
t.name.database.getOrElse(client.currentDatabase).toLowerCase,
122+
t.name.table.toLowerCase)
123123
}
124124

125125
/** A cache of Spark SQL data source tables that have been accessed. */
@@ -293,8 +293,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
293293

294294
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
295295
CatalogTable(
296-
specifiedDatabase = Option(dbName),
297-
name = tblName,
296+
name = TableIdentifier(tblName, Option(dbName)),
298297
tableType = tableType,
299298
schema = Nil,
300299
storage = CatalogStorageFormat(
@@ -314,8 +313,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
314313
assert(relation.partitionSchema.isEmpty)
315314

316315
CatalogTable(
317-
specifiedDatabase = Option(dbName),
318-
name = tblName,
316+
name = TableIdentifier(tblName, Option(dbName)),
319317
tableType = tableType,
320318
storage = CatalogStorageFormat(
321319
locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
@@ -432,7 +430,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
432430
alias match {
433431
// because hive use things like `_c0` to build the expanded text
434432
// currently we cannot support view from "create view v1(c1) as ..."
435-
case None => SubqueryAlias(table.name, hive.parseSql(viewText))
433+
case None => SubqueryAlias(table.name.table, hive.parseSql(viewText))
436434
case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText))
437435
}
438436
} else {
@@ -618,9 +616,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
618616
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
619617

620618
execution.CreateViewAsSelect(
621-
table.copy(
622-
specifiedDatabase = Some(dbName),
623-
name = tblName),
619+
table.copy(name = TableIdentifier(tblName, Some(dbName))),
624620
child,
625621
allowExisting,
626622
replace)
@@ -642,15 +638,15 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
642638
if (hive.convertCTAS && table.storage.serde.isEmpty) {
643639
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
644640
// does not specify any storage format (file format and storage handler).
645-
if (table.specifiedDatabase.isDefined) {
641+
if (table.name.database.isDefined) {
646642
throw new AnalysisException(
647643
"Cannot specify database name in a CTAS statement " +
648644
"when spark.sql.hive.convertCTAS is set to true.")
649645
}
650646

651647
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
652648
CreateTableUsingAsSelect(
653-
TableIdentifier(desc.name),
649+
TableIdentifier(desc.name.table),
654650
conf.defaultDataSourceName,
655651
temporary = false,
656652
Array.empty[String],
@@ -671,9 +667,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
671667
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
672668

673669
execution.CreateTableAsSelect(
674-
desc.copy(
675-
specifiedDatabase = Some(dbName),
676-
name = tblName),
670+
desc.copy(name = TableIdentifier(tblName, Some(dbName))),
677671
child,
678672
allowExisting)
679673
}
@@ -824,7 +818,7 @@ private[hive] case class MetastoreRelation(
824818
// We start by constructing an API table as Hive performs several important transformations
825819
// internally when converting an API table to a QL table.
826820
val tTable = new org.apache.hadoop.hive.metastore.api.Table()
827-
tTable.setTableName(table.name)
821+
tTable.setTableName(table.name.table)
828822
tTable.setDbName(table.database)
829823

830824
val tableParameters = new java.util.HashMap[String, String]()

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private[hive] case class CreateTableAsSelect(
6060

6161
override def output: Seq[Attribute] = Seq.empty[Attribute]
6262
override lazy val resolved: Boolean =
63-
tableDesc.specifiedDatabase.isDefined &&
63+
tableDesc.name.database.isDefined &&
6464
tableDesc.schema.nonEmpty &&
6565
tableDesc.storage.serde.isDefined &&
6666
tableDesc.storage.inputFormat.isDefined &&
@@ -185,13 +185,10 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
185185
properties: Map[String, String],
186186
allowExist: Boolean,
187187
replace: Boolean): CreateViewAsSelect = {
188-
val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts)
189-
188+
val tableIdentifier = extractTableIdent(viewNameParts)
190189
val originalText = query.source
191-
192190
val tableDesc = CatalogTable(
193-
specifiedDatabase = dbName,
194-
name = viewName,
191+
name = tableIdentifier,
195192
tableType = CatalogTableType.VIRTUAL_VIEW,
196193
schema = schema,
197194
storage = CatalogStorageFormat(
@@ -356,12 +353,11 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
356353
"TOK_TABLELOCATION",
357354
"TOK_TABLEPROPERTIES"),
358355
children)
359-
val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
356+
val tableIdentifier = extractTableIdent(tableNameParts)
360357

361358
// TODO add bucket support
362359
var tableDesc: CatalogTable = CatalogTable(
363-
specifiedDatabase = dbName,
364-
name = tblName,
360+
name = tableIdentifier,
365361
tableType =
366362
if (externalTable.isDefined) {
367363
CatalogTableType.EXTERNAL_TABLE

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private[hive] trait HiveClient {
9191
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
9292

9393
/** Alter a table whose name matches the one specified in `table`, assuming it exists. */
94-
final def alterTable(table: CatalogTable): Unit = alterTable(table.name, table)
94+
final def alterTable(table: CatalogTable): Unit = alterTable(table.name.table, table)
9595

9696
/** Updates the given table with new metadata, optionally renaming the table. */
9797
def alterTable(tableName: String, table: CatalogTable): Unit

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
3535
import org.apache.hadoop.security.UserGroupInformation
3636

3737
import org.apache.spark.{Logging, SparkConf, SparkException}
38+
import org.apache.spark.sql.catalyst.TableIdentifier
3839
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
3940
import org.apache.spark.sql.catalyst.catalog._
4041
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -298,8 +299,7 @@ private[hive] class HiveClientImpl(
298299
logDebug(s"Looking up $dbName.$tableName")
299300
Option(client.getTable(dbName, tableName, false)).map { h =>
300301
CatalogTable(
301-
specifiedDatabase = Option(h.getDbName),
302-
name = h.getTableName,
302+
name = TableIdentifier(h.getTableName, Option(h.getDbName)),
303303
tableType = h.getTableType match {
304304
case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE
305305
case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE
@@ -639,7 +639,7 @@ private[hive] class HiveClientImpl(
639639
}
640640

641641
private def toHiveTable(table: CatalogTable): HiveTable = {
642-
val hiveTable = new HiveTable(table.database, table.name)
642+
val hiveTable = new HiveTable(table.database, table.name.table)
643643
hiveTable.setTableType(table.tableType match {
644644
case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE
645645
case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ case class CreateTableAsSelect(
3838
allowExisting: Boolean)
3939
extends RunnableCommand {
4040

41-
val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
41+
private val tableIdentifier = tableDesc.name
4242

4343
override def children: Seq[LogicalPlan] = Seq(query)
4444

@@ -93,6 +93,6 @@ case class CreateTableAsSelect(
9393
}
9494

9595
override def argString: String = {
96-
s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]"
96+
s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name.table}, InsertIntoHiveTable]"
9797
}
9898
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private[hive] case class CreateViewAsSelect(
4444
assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
4545
assert(tableDesc.viewText.isDefined)
4646

47-
val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
47+
private val tableIdentifier = tableDesc.name
4848

4949
override def run(sqlContext: SQLContext): Seq[Row] = {
5050
val hiveContext = sqlContext.asInstanceOf[HiveContext]
@@ -116,7 +116,7 @@ private[hive] case class CreateViewAsSelect(
116116
}
117117

118118
val viewText = tableDesc.viewText.get
119-
val viewName = quote(tableDesc.name)
119+
val viewName = quote(tableDesc.name.table)
120120
s"SELECT $viewOutput FROM ($viewText) $viewName"
121121
}
122122

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
5454

5555
val (desc, exists) = extractTableDesc(s1)
5656
assert(exists)
57-
assert(desc.specifiedDatabase == Some("mydb"))
58-
assert(desc.name == "page_view")
57+
assert(desc.name.database == Some("mydb"))
58+
assert(desc.name.table == "page_view")
5959
assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
6060
assert(desc.storage.locationUri == Some("/user/external/page_view"))
6161
assert(desc.schema ==
@@ -100,8 +100,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
100100

101101
val (desc, exists) = extractTableDesc(s2)
102102
assert(exists)
103-
assert(desc.specifiedDatabase == Some("mydb"))
104-
assert(desc.name == "page_view")
103+
assert(desc.name.database == Some("mydb"))
104+
assert(desc.name.table == "page_view")
105105
assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
106106
assert(desc.storage.locationUri == Some("/user/external/page_view"))
107107
assert(desc.schema ==
@@ -127,8 +127,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
127127
val s3 = """CREATE TABLE page_view AS SELECT * FROM src"""
128128
val (desc, exists) = extractTableDesc(s3)
129129
assert(exists == false)
130-
assert(desc.specifiedDatabase == None)
131-
assert(desc.name == "page_view")
130+
assert(desc.name.database == None)
131+
assert(desc.name.table == "page_view")
132132
assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
133133
assert(desc.storage.locationUri == None)
134134
assert(desc.schema == Seq.empty[CatalogColumn])
@@ -162,8 +162,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
162162
| ORDER BY key, value""".stripMargin
163163
val (desc, exists) = extractTableDesc(s5)
164164
assert(exists == false)
165-
assert(desc.specifiedDatabase == None)
166-
assert(desc.name == "ctas2")
165+
assert(desc.name.database == None)
166+
assert(desc.name.table == "ctas2")
167167
assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
168168
assert(desc.storage.locationUri == None)
169169
assert(desc.schema == Seq.empty[CatalogColumn])

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,8 +719,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
719719
withTable(tableName) {
720720
val schema = StructType(StructField("int", IntegerType, true) :: Nil)
721721
val hiveTable = CatalogTable(
722-
specifiedDatabase = Some("default"),
723-
name = tableName,
722+
name = TableIdentifier(tableName, Some("default")),
724723
tableType = CatalogTableType.MANAGED_TABLE,
725724
schema = Seq.empty,
726725
storage = CatalogStorageFormat(

0 commit comments

Comments
 (0)