Skip to content

[SPARK-13923] [SQL] Implement SessionCatalog #11750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}


/**
Expand Down Expand Up @@ -68,19 +69,20 @@ class InMemoryCatalog extends ExternalCatalog {

private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function $funcName does not exist in $db database")
throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
}
}

private def requireTableExists(db: String, table: String): Unit = {
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table $table does not exist in $db database")
throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
}
}

private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
throw new AnalysisException(
s"Partition does not exist in database '$db' table '$table': '$spec'")
}
}

Expand All @@ -93,7 +95,7 @@ class InMemoryCatalog extends ExternalCatalog {
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
throw new AnalysisException(s"Database '${dbDefinition.name}' already exists.")
}
} else {
catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
Expand All @@ -108,17 +110,17 @@ class InMemoryCatalog extends ExternalCatalog {
if (!cascade) {
// If cascade is false, make sure the database is empty.
if (catalog(db).tables.nonEmpty) {
throw new AnalysisException(s"Database $db is not empty. One or more tables exist.")
throw new AnalysisException(s"Database '$db' is not empty. One or more tables exist.")
}
if (catalog(db).functions.nonEmpty) {
throw new AnalysisException(s"Database $db is not empty. One or more functions exist.")
throw new AnalysisException(s"Database '$db' is not empty. One or more functions exist.")
}
}
// Remove the database.
catalog.remove(db)
} else {
if (!ignoreIfNotExists) {
throw new AnalysisException(s"Database $db does not exist")
throw new AnalysisException(s"Database '$db' does not exist")
}
}
}
Expand Down Expand Up @@ -156,12 +158,13 @@ class InMemoryCatalog extends ExternalCatalog {
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
if (existsTable(db, tableDefinition.name)) {
val table = tableDefinition.name.table
if (existsTable(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
}
} else {
catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition))
catalog(db).tables.put(table, new TableDesc(tableDefinition))
}
}

Expand All @@ -174,22 +177,22 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
throw new AnalysisException(s"Table $table does not exist in $db database")
throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
}
}
}

override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(name = newName)
oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db)))
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}

override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
requireTableExists(db, tableDefinition.name)
catalog(db).tables(tableDefinition.name).table = tableDefinition
requireTableExists(db, tableDefinition.name.table)
catalog(db).tables(tableDefinition.name.table).table = tableDefinition
}

override def getTable(db: String, table: String): CatalogTable = synchronized {
Expand Down Expand Up @@ -222,8 +225,8 @@ class InMemoryCatalog extends ExternalCatalog {
val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
if (dupSpecs.nonEmpty) {
val dupSpecsStr = dupSpecs.mkString("\n===\n")
throw new AnalysisException(
s"The following partitions already exist in database $db table $table:\n$dupSpecsStr")
throw new AnalysisException("The following partitions already exist in database " +
s"'$db' table '$table':\n$dupSpecsStr")
}
}
parts.foreach { p => existingParts.put(p.spec, p) }
Expand All @@ -240,8 +243,8 @@ class InMemoryCatalog extends ExternalCatalog {
val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
if (missingSpecs.nonEmpty) {
val missingSpecsStr = missingSpecs.mkString("\n===\n")
throw new AnalysisException(
s"The following partitions do not exist in database $db table $table:\n$missingSpecsStr")
throw new AnalysisException("The following partitions do not exist in database " +
s"'$db' table '$table':\n$missingSpecsStr")
}
}
partSpecs.foreach(existingParts.remove)
Expand Down Expand Up @@ -292,10 +295,10 @@ class InMemoryCatalog extends ExternalCatalog {

override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
if (existsFunction(db, func.name)) {
throw new AnalysisException(s"Function $func already exists in $db database")
if (existsFunction(db, func.name.funcName)) {
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
catalog(db).functions.put(func.name, func)
catalog(db).functions.put(func.name.funcName, func)
}
}

Expand All @@ -306,14 +309,14 @@ class InMemoryCatalog extends ExternalCatalog {

override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
val newFunc = getFunction(db, oldName).copy(name = newName)
val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db)))
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
}

override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
requireFunctionExists(db, funcDefinition.name)
catalog(db).functions.put(funcDefinition.name, funcDefinition)
requireFunctionExists(db, funcDefinition.name.funcName)
catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition)
}

override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
Expand Down
Loading