Skip to content

[SPARK-14014][SQL] Integrate session catalog (attempt #2) #11938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1817,7 +1817,8 @@ test_that("approxQuantile() on a DataFrame", {

test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
expect_equal(grepl("Table not found: blah", retError), TRUE)
expect_equal(grepl("Table not found", retError), TRUE)
expect_equal(grepl("blah", retError), TRUE)
})

irisDF <- suppressWarnings(createDataFrame(sqlContext, iris))
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,9 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.Logging.initializeLogIfNecessary"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerEvent.logEvent"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.OutputWriterFactory.newInstance")
) ++ Seq(
// [SPARK-14014] Replace existing analysis.Catalog with SessionCatalog
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.this")
) ++ Seq(
// [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Logging"),
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def tableNames(self, dbName=None):
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlContext.tableNames()
True
>>> "table1" in sqlContext.tableNames("db")
>>> "table1" in sqlContext.tableNames("default")
True
"""
if dbName is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand All @@ -36,23 +37,22 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.types._

/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
* when all relations are already filled in and the analyzer needs only to resolve attribute
* references.
* A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
* Used for testing when all relations are already filled in and the analyzer needs only
* to resolve attribute references.
*/
object SimpleAnalyzer
extends Analyzer(
EmptyCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = true))
extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true))
class SimpleAnalyzer(conf: CatalystConf)
extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf)

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
* a [[FunctionRegistry]].
* [[UnresolvedRelation]]s into fully typed objects using information in a
* [[SessionCatalog]] and a [[FunctionRegistry]].
*/
class Analyzer(
catalog: Catalog,
catalog: SessionCatalog,
registry: FunctionRegistry,
conf: CatalystConf,
maxIterations: Int = 100)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)

/**
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
* Holds the name of a relation that has yet to be looked up in a catalog.
*/
case class UnresolvedRelation(
tableIdentifier: TableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,37 +52,34 @@ class InMemoryCatalog extends ExternalCatalog {
names.filter { funcName => regex.pattern.matcher(funcName).matches() }
}

private def existsFunction(db: String, funcName: String): Boolean = {
private def functionExists(db: String, funcName: String): Boolean = {
requireDbExists(db)
catalog(db).functions.contains(funcName)
}

private def existsTable(db: String, table: String): Boolean = {
requireDbExists(db)
catalog(db).tables.contains(table)
}

private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
}

private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
if (!functionExists(db, funcName)) {
throw new AnalysisException(
s"Function not found: '$funcName' does not exist in database '$db'")
}
}

private def requireTableExists(db: String, table: String): Unit = {
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
if (!tableExists(db, table)) {
throw new AnalysisException(
s"Table not found: '$table' does not exist in database '$db'")
}
}

private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
if (!partitionExists(db, table, spec)) {
throw new AnalysisException(
s"Partition does not exist in database '$db' table '$table': '$spec'")
s"Partition not found: database '$db' table '$table' does not contain: '$spec'")
}
}

Expand Down Expand Up @@ -159,7 +156,7 @@ class InMemoryCatalog extends ExternalCatalog {
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
val table = tableDefinition.name.table
if (existsTable(db, table)) {
if (tableExists(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
}
Expand All @@ -173,7 +170,7 @@ class InMemoryCatalog extends ExternalCatalog {
table: String,
ignoreIfNotExists: Boolean): Unit = synchronized {
requireDbExists(db)
if (existsTable(db, table)) {
if (tableExists(db, table)) {
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
Expand All @@ -200,13 +197,17 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).tables(table).table
}

override def tableExists(db: String, table: String): Boolean = synchronized {
requireDbExists(db)
catalog(db).tables.contains(table)
}

override def listTables(db: String): Seq[String] = synchronized {
requireDbExists(db)
catalog(db).tables.keySet.toSeq
}

override def listTables(db: String, pattern: String): Seq[String] = synchronized {
requireDbExists(db)
filterPattern(listTables(db), pattern)
}

Expand Down Expand Up @@ -295,7 +296,7 @@ class InMemoryCatalog extends ExternalCatalog {

override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
if (existsFunction(db, func.name.funcName)) {
if (functionExists(db, func.name.funcName)) {
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
catalog(db).functions.put(func.name.funcName, func)
Expand Down
Loading