Skip to content

Commit 20ddf5f

Browse files
author
Andrew Or
committed
[SPARK-14014][SQL] Integrate session catalog (attempt #2)
## What changes were proposed in this pull request? This reopens apache#11836, which was merged but promptly reverted because it introduced flaky Hive tests. ## How was this patch tested? See `CatalogTestCases`, `SessionCatalogSuite` and `HiveContextSuite`. Author: Andrew Or <andrew@databricks.com> Closes apache#11938 from andrewor14/session-catalog-again.
1 parent 1c70b76 commit 20ddf5f

File tree

61 files changed

+1043
-816
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1043
-816
lines changed

R/pkg/inst/tests/testthat/test_sparkSQL.R

+2-1
Original file line numberDiff line numberDiff line change
@@ -1817,7 +1817,8 @@ test_that("approxQuantile() on a DataFrame", {
18171817

18181818
test_that("SQL error message is returned from JVM", {
18191819
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
1820-
expect_equal(grepl("Table not found: blah", retError), TRUE)
1820+
expect_equal(grepl("Table not found", retError), TRUE)
1821+
expect_equal(grepl("blah", retError), TRUE)
18211822
})
18221823

18231824
irisDF <- suppressWarnings(createDataFrame(sqlContext, iris))

project/MimaExcludes.scala

+3
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,9 @@ object MimaExcludes {
562562
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.Logging.initializeLogIfNecessary"),
563563
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerEvent.logEvent"),
564564
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.OutputWriterFactory.newInstance")
565+
) ++ Seq(
566+
// [SPARK-14014] Replace existing analysis.Catalog with SessionCatalog
567+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.this")
565568
) ++ Seq(
566569
// [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging
567570
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Logging"),

python/pyspark/sql/context.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ def tableNames(self, dbName=None):
554554
>>> sqlContext.registerDataFrameAsTable(df, "table1")
555555
>>> "table1" in sqlContext.tableNames()
556556
True
557-
>>> "table1" in sqlContext.tableNames("db")
557+
>>> "table1" in sqlContext.tableNames("default")
558558
True
559559
"""
560560
if dbName is None:

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

+10-10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424

2525
import org.apache.spark.sql.AnalysisException
2626
import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
27+
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
2728
import org.apache.spark.sql.catalyst.encoders.OuterScopes
2829
import org.apache.spark.sql.catalyst.expressions._
2930
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -36,23 +37,22 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
3637
import org.apache.spark.sql.types._
3738

3839
/**
39-
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
40-
* when all relations are already filled in and the analyzer needs only to resolve attribute
41-
* references.
40+
* A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
41+
* Used for testing when all relations are already filled in and the analyzer needs only
42+
* to resolve attribute references.
4243
*/
4344
object SimpleAnalyzer
44-
extends Analyzer(
45-
EmptyCatalog,
46-
EmptyFunctionRegistry,
47-
new SimpleCatalystConf(caseSensitiveAnalysis = true))
45+
extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true))
46+
class SimpleAnalyzer(conf: CatalystConf)
47+
extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf)
4848

4949
/**
5050
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
51-
* [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
52-
* a [[FunctionRegistry]].
51+
* [[UnresolvedRelation]]s into fully typed objects using information in a
52+
* [[SessionCatalog]] and a [[FunctionRegistry]].
5353
*/
5454
class Analyzer(
55-
catalog: Catalog,
55+
catalog: SessionCatalog,
5656
registry: FunctionRegistry,
5757
conf: CatalystConf,
5858
maxIterations: Int = 100)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

-218
This file was deleted.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
3434
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)
3535

3636
/**
37-
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
37+
* Holds the name of a relation that has yet to be looked up in a catalog.
3838
*/
3939
case class UnresolvedRelation(
4040
tableIdentifier: TableIdentifier,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

+18-17
Original file line numberDiff line numberDiff line change
@@ -52,37 +52,34 @@ class InMemoryCatalog extends ExternalCatalog {
5252
names.filter { funcName => regex.pattern.matcher(funcName).matches() }
5353
}
5454

55-
private def existsFunction(db: String, funcName: String): Boolean = {
55+
private def functionExists(db: String, funcName: String): Boolean = {
5656
requireDbExists(db)
5757
catalog(db).functions.contains(funcName)
5858
}
5959

60-
private def existsTable(db: String, table: String): Boolean = {
61-
requireDbExists(db)
62-
catalog(db).tables.contains(table)
63-
}
64-
65-
private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
60+
private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
6661
requireTableExists(db, table)
6762
catalog(db).tables(table).partitions.contains(spec)
6863
}
6964

7065
private def requireFunctionExists(db: String, funcName: String): Unit = {
71-
if (!existsFunction(db, funcName)) {
72-
throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
66+
if (!functionExists(db, funcName)) {
67+
throw new AnalysisException(
68+
s"Function not found: '$funcName' does not exist in database '$db'")
7369
}
7470
}
7571

7672
private def requireTableExists(db: String, table: String): Unit = {
77-
if (!existsTable(db, table)) {
78-
throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
73+
if (!tableExists(db, table)) {
74+
throw new AnalysisException(
75+
s"Table not found: '$table' does not exist in database '$db'")
7976
}
8077
}
8178

8279
private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
83-
if (!existsPartition(db, table, spec)) {
80+
if (!partitionExists(db, table, spec)) {
8481
throw new AnalysisException(
85-
s"Partition does not exist in database '$db' table '$table': '$spec'")
82+
s"Partition not found: database '$db' table '$table' does not contain: '$spec'")
8683
}
8784
}
8885

@@ -159,7 +156,7 @@ class InMemoryCatalog extends ExternalCatalog {
159156
ignoreIfExists: Boolean): Unit = synchronized {
160157
requireDbExists(db)
161158
val table = tableDefinition.name.table
162-
if (existsTable(db, table)) {
159+
if (tableExists(db, table)) {
163160
if (!ignoreIfExists) {
164161
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
165162
}
@@ -173,7 +170,7 @@ class InMemoryCatalog extends ExternalCatalog {
173170
table: String,
174171
ignoreIfNotExists: Boolean): Unit = synchronized {
175172
requireDbExists(db)
176-
if (existsTable(db, table)) {
173+
if (tableExists(db, table)) {
177174
catalog(db).tables.remove(table)
178175
} else {
179176
if (!ignoreIfNotExists) {
@@ -200,13 +197,17 @@ class InMemoryCatalog extends ExternalCatalog {
200197
catalog(db).tables(table).table
201198
}
202199

200+
override def tableExists(db: String, table: String): Boolean = synchronized {
201+
requireDbExists(db)
202+
catalog(db).tables.contains(table)
203+
}
204+
203205
override def listTables(db: String): Seq[String] = synchronized {
204206
requireDbExists(db)
205207
catalog(db).tables.keySet.toSeq
206208
}
207209

208210
override def listTables(db: String, pattern: String): Seq[String] = synchronized {
209-
requireDbExists(db)
210211
filterPattern(listTables(db), pattern)
211212
}
212213

@@ -295,7 +296,7 @@ class InMemoryCatalog extends ExternalCatalog {
295296

296297
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
297298
requireDbExists(db)
298-
if (existsFunction(db, func.name.funcName)) {
299+
if (functionExists(db, func.name.funcName)) {
299300
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
300301
} else {
301302
catalog(db).functions.put(func.name.funcName, func)

0 commit comments

Comments
 (0)