-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25006][SQL] Add CatalogTableIdentifier. #21978
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,30 +18,32 @@ | |
package org.apache.spark.sql.catalyst | ||
|
||
/** | ||
* An identifier that optionally specifies a database. | ||
* An identifier that optionally specifies a database and catalog. | ||
* | ||
* Format (unquoted): "name" or "db.name" | ||
* Format (quoted): "`name`" or "`db`.`name`" | ||
*/ | ||
sealed trait IdentifierWithDatabase { | ||
sealed trait IdentifierWithOptionalDatabaseAndCatalog { | ||
val identifier: String | ||
|
||
def database: Option[String] | ||
|
||
def catalog: Option[String] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Default to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an abstract method definition and catalog is always implemented by a val. |
||
|
||
/* | ||
* Escapes back-ticks within the identifier name with double-back-ticks. | ||
*/ | ||
private def quoteIdentifier(name: String): String = name.replace("`", "``") | ||
|
||
def quotedString: String = { | ||
val replacedId = quoteIdentifier(identifier) | ||
val replacedDb = database.map(quoteIdentifier(_)) | ||
|
||
if (replacedDb.isDefined) s"`${replacedDb.get}`.`$replacedId`" else s"`$replacedId`" | ||
// database is required if catalog is present | ||
assert(database.isDefined || catalog.isEmpty) | ||
def q(s: String): String = s"`${quoteIdentifier(s)}`" | ||
Seq(catalog.map(q), database.map(q), Some(q(identifier))).flatten.mkString(".") | ||
} | ||
|
||
def unquotedString: String = { | ||
if (database.isDefined) s"${database.get}.$identifier" else identifier | ||
Seq(catalog, database, Some(identifier)).flatten.mkString(".") | ||
} | ||
|
||
override def toString: String = quotedString | ||
|
@@ -64,18 +66,74 @@ object AliasIdentifier { | |
def apply(identifier: String): AliasIdentifier = new AliasIdentifier(identifier) | ||
} | ||
|
||
object CatalogTableIdentifier { | ||
def apply(table: String): CatalogTableIdentifier = | ||
new CatalogTableIdentifier(table, None, None) | ||
|
||
def apply(table: String, database: String): CatalogTableIdentifier = | ||
new CatalogTableIdentifier(table, Some(database), None) | ||
|
||
def apply(table: String, database: String, catalog: String): CatalogTableIdentifier = | ||
new CatalogTableIdentifier(table, Some(database), Some(catalog)) | ||
} | ||
|
||
/** | ||
* Identifies a table in a database. | ||
* If `database` is not defined, the current database is used. | ||
* When we register a permanent function in the FunctionRegistry, we use | ||
* unquotedString as the function name. | ||
* Identifies a table in a database and catalog. | ||
* If `database` is not defined, the current catalog's default database is used. | ||
* If `catalog` is not defined, the current catalog is used. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "current" meaning "global"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, we want to move away from a special global catalog. I think that Spark should have a current catalog, like a current database, which is used to resolve references that don't have an explicit catalog. That would have a default, just like the current database has a default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. When we add the logical side of leveraging catalogs we can revisit the API of how to set the current catalog. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. This introduces the ability to expose a catalog to Spark. It doesn't actually add any user-facing operations. |
||
*/ | ||
case class TableIdentifier(table: String, database: Option[String]) | ||
extends IdentifierWithDatabase { | ||
case class CatalogTableIdentifier(table: String, database: Option[String], catalog: Option[String]) | ||
extends IdentifierWithOptionalDatabaseAndCatalog { | ||
|
||
// ensure database is present if catalog is defined | ||
assert(database.isDefined || catalog.isEmpty) | ||
|
||
override val identifier: String = table | ||
|
||
/** | ||
* Returns this as a TableIdentifier if its catalog is not set, fail otherwise. | ||
* | ||
* This is used to provide TableIdentifier for paths that do not support the catalog element. To | ||
* ensure that the identifier is compatible, this asserts that the catalog element is not defined. | ||
*/ | ||
lazy val asTableIdentifier: TableIdentifier = { | ||
assert(catalog.isEmpty, s"Cannot convert to TableIdentifier: catalog is ${catalog.get} != None") | ||
new TableIdentifier(table, database) | ||
} | ||
|
||
/** | ||
* Returns this CatalogTableIdentifier without the catalog. | ||
* | ||
* This is used for code paths where the catalog has already been used. | ||
*/ | ||
lazy val dropCatalog: CatalogTableIdentifier = catalog match { | ||
case Some(_) => CatalogTableIdentifier(table, database, None) | ||
case _ => this | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Identifies a table in a database. | ||
* If `database` is not defined, the current database is used. | ||
* | ||
* This class is used instead of CatalogTableIdentifier in paths that do not yet support table | ||
* identifiers with catalogs. | ||
*/ | ||
class TableIdentifier(table: String, db: Option[String]) | ||
extends CatalogTableIdentifier(table, db, None) { | ||
|
||
def this(table: String) = this(table, None) | ||
|
||
override lazy val asTableIdentifier: TableIdentifier = this | ||
|
||
override def copy( | ||
name: String = this.table, | ||
database: Option[String] = this.db, | ||
catalog: Option[String] = None): TableIdentifier = { | ||
assert(catalog.isEmpty, "Cannot add catalog to a TableIdentifier using copy") | ||
new TableIdentifier(name, database) | ||
} | ||
} | ||
|
||
/** A fully qualified identifier for a table (i.e., database.tableName) */ | ||
|
@@ -84,19 +142,27 @@ case class QualifiedTableName(database: String, name: String) { | |
} | ||
|
||
object TableIdentifier { | ||
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName) | ||
def apply(table: String): TableIdentifier = | ||
new TableIdentifier(table) | ||
|
||
def apply(table: String, database: Option[String]): TableIdentifier = | ||
new TableIdentifier(table, database) | ||
} | ||
|
||
|
||
/** | ||
* Identifies a function in a database. | ||
* If `database` is not defined, the current database is used. | ||
* When we register a permanent function in the FunctionRegistry, we use | ||
* unquotedString as the function name. | ||
*/ | ||
case class FunctionIdentifier(funcName: String, database: Option[String]) | ||
extends IdentifierWithDatabase { | ||
extends IdentifierWithOptionalDatabaseAndCatalog { | ||
|
||
override val identifier: String = funcName | ||
|
||
override val catalog: Option[String] = None | ||
|
||
def this(funcName: String) = this(funcName, None) | ||
|
||
override def toString: String = unquotedString | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update formats in these scaladocs.