Skip to content

[SPARK-25006][SQL] Add CatalogTableIdentifier. #21978

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ singleTableIdentifier
: tableIdentifier EOF
;

singleCatalogTableIdentifier
: catalogTableIdentifier EOF
;

singleFunctionIdentifier
: functionIdentifier EOF
;
Expand Down Expand Up @@ -538,6 +542,10 @@ rowFormat
(NULL DEFINED AS nullDefinedAs=STRING)? #rowFormatDelimited
;

catalogTableIdentifier
: ((catalog=identifier '.')? db=identifier '.')? table=identifier
;

tableIdentifier
: (db=identifier '.')? table=identifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
Expand All @@ -38,15 +38,17 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
/**
* Holds the name of a relation that has yet to be looked up in a catalog.
*
* @param tableIdentifier table name
* @param table a [[CatalogTableIdentifier]]
*/
case class UnresolvedRelation(tableIdentifier: TableIdentifier)
extends LeafNode {
case class UnresolvedRelation(table: CatalogTableIdentifier) extends LeafNode {

/** Returns a `.` separated name for this relation. */
def tableName: String = tableIdentifier.unquotedString
def tableName: String = table.unquotedString

override def output: Seq[Attribute] = Nil
/** Returns the table identifier without the catalog element */
def tableIdentifier: TableIdentifier = table.asTableIdentifier

override def output: Seq[AttributeReference] = Nil

override lazy val resolved = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,32 @@
package org.apache.spark.sql.catalyst

/**
* An identifier that optionally specifies a database.
* An identifier that optionally specifies a database and catalog.
*
* Format (unquoted): "name" or "db.name"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update formats in these scaladocs.

* Format (quoted): "`name`" or "`db`.`name`"
*/
sealed trait IdentifierWithDatabase {
sealed trait IdentifierWithOptionalDatabaseAndCatalog {
val identifier: String

def database: Option[String]

def catalog: Option[String]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default to None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an abstract method definition and catalog is always implemented by a val.


/*
* Escapes back-ticks within the identifier name with double-back-ticks.
*/
private def quoteIdentifier(name: String): String = name.replace("`", "``")

def quotedString: String = {
val replacedId = quoteIdentifier(identifier)
val replacedDb = database.map(quoteIdentifier(_))

if (replacedDb.isDefined) s"`${replacedDb.get}`.`$replacedId`" else s"`$replacedId`"
// database is required if catalog is present
assert(database.isDefined || catalog.isEmpty)
def q(s: String): String = s"`${quoteIdentifier(s)}`"
Seq(catalog.map(q), database.map(q), Some(q(identifier))).flatten.mkString(".")
}

def unquotedString: String = {
if (database.isDefined) s"${database.get}.$identifier" else identifier
Seq(catalog, database, Some(identifier)).flatten.mkString(".")
}

override def toString: String = quotedString
Expand All @@ -64,18 +66,74 @@ object AliasIdentifier {
def apply(identifier: String): AliasIdentifier = new AliasIdentifier(identifier)
}

object CatalogTableIdentifier {
def apply(table: String): CatalogTableIdentifier =
new CatalogTableIdentifier(table, None, None)

def apply(table: String, database: String): CatalogTableIdentifier =
new CatalogTableIdentifier(table, Some(database), None)

def apply(table: String, database: String, catalog: String): CatalogTableIdentifier =
new CatalogTableIdentifier(table, Some(database), Some(catalog))
}

/**
* Identifies a table in a database.
* If `database` is not defined, the current database is used.
* When we register a permanent function in the FunctionRegistry, we use
* unquotedString as the function name.
* Identifies a table in a database and catalog.
* If `database` is not defined, the current catalog's default database is used.
* If `catalog` is not defined, the current catalog is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"current" meaning "global"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we want to move away from a special global catalog. I think that Spark should have a current catalog, like a current database, which is used to resolve references that don't have an explicit catalog. That would have a default, just like the current database has a default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. When we add the logical side of leveraging catalogs we can revisit the API of how to set the current catalog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. This introduces the ability to expose a catalog to Spark. It doesn't actually add any user-facing operations.

*/
case class TableIdentifier(table: String, database: Option[String])
extends IdentifierWithDatabase {
case class CatalogTableIdentifier(table: String, database: Option[String], catalog: Option[String])
extends IdentifierWithOptionalDatabaseAndCatalog {

// ensure database is present if catalog is defined
assert(database.isDefined || catalog.isEmpty)

override val identifier: String = table

/**
* Returns this as a TableIdentifier if its catalog is not set, fail otherwise.
*
* This is used to provide TableIdentifier for paths that do not support the catalog element. To
* ensure that the identifier is compatible, this asserts that the catalog element is not defined.
*/
lazy val asTableIdentifier: TableIdentifier = {
assert(catalog.isEmpty, s"Cannot convert to TableIdentifier: catalog is ${catalog.get} != None")
new TableIdentifier(table, database)
}

/**
* Returns this CatalogTableIdentifier without the catalog.
*
* This is used for code paths where the catalog has already been used.
*/
lazy val dropCatalog: CatalogTableIdentifier = catalog match {
case Some(_) => CatalogTableIdentifier(table, database, None)
case _ => this
}
}


/**
* Identifies a table in a database.
* If `database` is not defined, the current database is used.
*
* This class is used instead of CatalogTableIdentifier in paths that do not yet support table
* identifiers with catalogs.
*/
class TableIdentifier(table: String, db: Option[String])
extends CatalogTableIdentifier(table, db, None) {

def this(table: String) = this(table, None)

override lazy val asTableIdentifier: TableIdentifier = this

override def copy(
name: String = this.table,
database: Option[String] = this.db,
catalog: Option[String] = None): TableIdentifier = {
assert(catalog.isEmpty, "Cannot add catalog to a TableIdentifier using copy")
new TableIdentifier(name, database)
}
}

/** A fully qualified identifier for a table (i.e., database.tableName) */
Expand All @@ -84,19 +142,27 @@ case class QualifiedTableName(database: String, name: String) {
}

object TableIdentifier {
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
def apply(table: String): TableIdentifier =
new TableIdentifier(table)

def apply(table: String, database: Option[String]): TableIdentifier =
new TableIdentifier(table, database)
}


/**
* Identifies a function in a database.
* If `database` is not defined, the current database is used.
* When we register a permanent function in the FunctionRegistry, we use
* unquotedString as the function name.
*/
case class FunctionIdentifier(funcName: String, database: Option[String])
extends IdentifierWithDatabase {
extends IdentifierWithOptionalDatabaseAndCatalog {

override val identifier: String = funcName

override val catalog: Option[String] = None

def this(funcName: String) = this(funcName, None)

override def toString: String = unquotedString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -81,6 +81,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
visitTableIdentifier(ctx.tableIdentifier)
}

override def visitSingleCatalogTableIdentifier(
ctx: SingleCatalogTableIdentifierContext): CatalogTableIdentifier = withOrigin(ctx) {
visitCatalogTableIdentifier(ctx.catalogTableIdentifier)
}

override def visitSingleFunctionIdentifier(
ctx: SingleFunctionIdentifierContext): FunctionIdentifier = withOrigin(ctx) {
visitFunctionIdentifier(ctx.functionIdentifier)
Expand Down Expand Up @@ -945,6 +950,17 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText))
}

/**
* Create a [[CatalogTableIdentifier]] from a 'tableName' or 'databaseName'.'tableName' pattern.
*/
override def visitCatalogTableIdentifier(
ctx: CatalogTableIdentifierContext): CatalogTableIdentifier = withOrigin(ctx) {
CatalogTableIdentifier(
ctx.table.getText,
Option(ctx.db).map(_.getText),
Option(ctx.catalog).map(_.getText))
}

/**
* Create a [[FunctionIdentifier]] from a 'functionName' or 'databaseName'.'functionName' pattern.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.antlr.v4.runtime.tree.TerminalNodeImpl

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.Origin
Expand All @@ -50,6 +50,13 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
astBuilder.visitSingleTableIdentifier(parser.singleTableIdentifier())
}

/** Creates a CatalogTableIdentifier for a given SQL string */
override def parseCatalogTableIdentifier(sqlText: String): CatalogTableIdentifier = {
parse(sqlText) { parser =>
astBuilder.visitSingleCatalogTableIdentifier(parser.singleCatalogTableIdentifier())
}
}

/** Creates FunctionIdentifier for a given SQL string. */
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
parse(sqlText) { parser =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.parser

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{DataType, StructType}
Expand Down Expand Up @@ -46,6 +46,12 @@ trait ParserInterface {
@throws[ParseException]("Text cannot be parsed to a TableIdentifier")
def parseTableIdentifier(sqlText: String): TableIdentifier

/**
* Parse a string to a [[CatalogTableIdentifier]].
*/
@throws[ParseException]("Text cannot be parsed to a CatalogTableIdentifier")
def parseCatalogTableIdentifier(sqlText: String): CatalogTableIdentifier

/**
* Parse a string to a [[FunctionIdentifier]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
try {
val fieldNames = getConstructorParameterNames(p.getClass)
val fieldValues = p.productIterator.toSeq
assert(fieldNames.length == fieldValues.length)
assert(fieldNames.length <= fieldValues.length)
("product-class" -> JString(p.getClass.getName)) :: fieldNames.zip(fieldValues).map {
case (name, value) => name -> parseToJson(value)
}.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Literal}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -138,6 +138,9 @@ case class MyParser(spark: SparkSession, delegate: ParserInterface) extends Pars
override def parseTableIdentifier(sqlText: String): TableIdentifier =
delegate.parseTableIdentifier(sqlText)

override def parseCatalogTableIdentifier(sqlText: String): CatalogTableIdentifier =
delegate.parseCatalogTableIdentifier(sqlText)

override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
delegate.parseFunctionIdentifier(sqlText)

Expand Down