Skip to content

Commit cbb4e7d

Browse files
zachschuermannzhengruifeng
authored andcommitted
[SPARK-39646][SQL] Make setCurrentDatabase compatible with 3 layer namespace
### What changes were proposed in this pull request? Change `setCurrentDatabase` catalog API to support 3 layer namespace. We use `sparkSession.sessionState.catalogManager.currentNamespace` for the currentDatabase now. ### Why are the changes needed? `setCurrentDatabase` doesn't support 3 layer namespace. ### Does this PR introduce _any_ user-facing change? Yes. This PR introduces a backwards-compatible API change to support 3 layer namespace (e.g. catalog.database.table) for `setCurrentDatabase`. ### How was this patch tested? UT Closes #36969 from schuermannator/3l-setCurrentDatabse. Authored-by: Zach Schuermann <zachary.zvs@gmail.com> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
1 parent 6624d91 commit cbb4e7d

File tree

3 files changed

+37
-5
lines changed

3 files changed

+37
-5
lines changed

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4015,8 +4015,8 @@ test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
40154015
expect_equal(currentDatabase(), "default")
40164016
expect_error(setCurrentDatabase("default"), NA)
40174017
expect_error(setCurrentDatabase("zxwtyswklpf"),
4018-
paste0("Error in setCurrentDatabase : analysis error - Database ",
4019-
"'zxwtyswklpf' does not exist"))
4018+
paste0("Error in setCurrentDatabase : no such database - Database ",
4019+
"'zxwtyswklpf' not found"))
40204020
dbs <- collect(listDatabases())
40214021
expect_equal(names(dbs), c("name", "catalog", "description", "locationUri"))
40224022
expect_equal(which(dbs[, 1] == "default"), 1)

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
5959
/**
6060
* Returns the current default database in this session.
6161
*/
62-
override def currentDatabase: String = sessionCatalog.getCurrentDatabase
62+
override def currentDatabase: String =
63+
sparkSession.sessionState.catalogManager.currentNamespace.toSeq.quoted
6364

6465
/**
6566
* Sets the current default database in this session.
6667
*/
6768
@throws[AnalysisException]("database does not exist")
6869
override def setCurrentDatabase(dbName: String): Unit = {
69-
requireDatabaseExists(dbName)
70-
sessionCatalog.setCurrentDatabase(dbName)
70+
// we assume dbName will not include the catalog prefix. e.g. if you call
71+
// setCurrentDatabase("catalog.db") it will search for a database catalog.db in the catalog.
72+
val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
73+
sparkSession.sessionState.catalogManager.setCurrentNamespace(ident.toArray)
7174
}
7275

7376
/**

sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,4 +866,33 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
866866
sql(s"CREATE NAMESPACE $qualified")
867867
assert(spark.catalog.getDatabase(qualified).name === db)
868868
}
869+
870+
test("three layer namespace compatibility - set current database") {
871+
spark.catalog.setCurrentCatalog("testcat")
872+
// namespace with the same name as catalog
873+
sql("CREATE NAMESPACE testcat.testcat.my_db")
874+
spark.catalog.setCurrentDatabase("testcat.my_db")
875+
assert(spark.catalog.currentDatabase == "testcat.my_db")
876+
// sessionCatalog still reports 'default' as current database
877+
assert(sessionCatalog.getCurrentDatabase == "default")
878+
val e = intercept[AnalysisException] {
879+
spark.catalog.setCurrentDatabase("my_db")
880+
}.getMessage
881+
assert(e.contains("my_db"))
882+
883+
// check that we can fall back to old sessionCatalog
884+
createDatabase("hive_db")
885+
val err = intercept[AnalysisException] {
886+
spark.catalog.setCurrentDatabase("hive_db")
887+
}.getMessage
888+
assert(err.contains("hive_db"))
889+
spark.catalog.setCurrentCatalog("spark_catalog")
890+
spark.catalog.setCurrentDatabase("hive_db")
891+
assert(spark.catalog.currentDatabase == "hive_db")
892+
assert(sessionCatalog.getCurrentDatabase == "hive_db")
893+
val e3 = intercept[AnalysisException] {
894+
spark.catalog.setCurrentDatabase("unknown_db")
895+
}.getMessage
896+
assert(e3.contains("unknown_db"))
897+
}
869898
}

0 commit comments

Comments
 (0)