From f0796ec0783b386ef3530da73ac744e375986ca5 Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Tue, 4 Apr 2023 10:56:43 +0800 Subject: [PATCH] [KYUUBI #4522] `use:catalog` should execute before than `use:database` ### _Why are the changes needed?_ close #4522 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4648 from lsm1/fix/kyuubi_4522. Closes #4522 e06046899 [senmiaoliu] use foreach bd83d6623 [senmiaoliu] spilt narmalizedConf 4d8445aac [senmiaoliu] avoid sort eda34d480 [senmiaoliu] use catalog first Authored-by: senmiaoliu Signed-off-by: Cheng Pan --- .../flink/session/FlinkSessionImpl.scala | 45 +++++++++++-------- .../spark/session/SparkSessionImpl.scala | 41 ++++++++++------- .../trino/session/TrinoSessionImpl.scala | 8 +++- 3 files changed, 58 insertions(+), 36 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index 75087b48ca2..a4b6a8a902b 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -57,25 +57,34 @@ class FlinkSessionImpl( override def open(): Unit = { executor.openSession(handle.identifier.toString) - normalizedConf.foreach { - case ("use:catalog", catalog) => - val tableEnv = sessionContext.getExecutionContext.getTableEnvironment - try { - tableEnv.useCatalog(catalog) - } catch { - case NonFatal(e) => + + val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) => + Array("use:catalog", "use:database").contains(k) + } + + useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog => + val tableEnv = sessionContext.getExecutionContext.getTableEnvironment + try { + tableEnv.useCatalog(catalog) + } catch { + case NonFatal(e) => + throw e + } + } + + useCatalogAndDatabaseConf.get("use:database").foreach { database => + val tableEnv = sessionContext.getExecutionContext.getTableEnvironment + try { + tableEnv.useDatabase(database) + } catch { + case NonFatal(e) => + if (database != "default") { throw e - } - case ("use:database", database) => - val tableEnv = sessionContext.getExecutionContext.getTableEnvironment - try { - tableEnv.useDatabase(database) - } catch { - case NonFatal(e) => - if (database != "default") { - throw e - } - } + } + } + } + + otherConf.foreach { case (key, value) => setModifiableConfig(key, value) } super.open() diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala index 78164ff5fab..96fc43e857d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala @@ -54,22 +54,31 @@ class SparkSessionImpl( private val sessionEvent = SessionEvent(this) override def open(): Unit = { - normalizedConf.foreach { - case ("use:catalog", catalog) => - try { - SparkCatalogShim().setCurrentCatalog(spark, catalog) - } catch { - case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") => - warn(e.getMessage()) - } - case ("use:database", database) => - try { - SparkCatalogShim().setCurrentDatabase(spark, database) - } catch { - case e - if database == "default" && e.getMessage != null && - e.getMessage.contains("not found") => - } + + val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) => + Array("use:catalog", "use:database").contains(k) + } + + useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog => + try { + SparkCatalogShim().setCurrentCatalog(spark, catalog) + } catch { + case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") => + warn(e.getMessage()) + } + } + + useCatalogAndDatabaseConf.get("use:database").foreach { database => + try { + SparkCatalogShim().setCurrentDatabase(spark, database) + } catch { + case e + if database == "default" && e.getMessage != null && + e.getMessage.contains("not found") => + } + } + + otherConf.foreach { case (key, value) => setModifiableConfig(key, value) } KDFRegistry.registerAll(spark) diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala index 81f973b1b5e..1a96bed73f5 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala @@ -57,10 +57,14 @@ class TrinoSessionImpl( private val sessionEvent = TrinoSessionEvent(this) override def open(): Unit = { - normalizedConf.foreach { + + val (useCatalogAndDatabaseConf, _) = normalizedConf.partition { case (k, _) => + Array("use:catalog", "use:database").contains(k) + } + + useCatalogAndDatabaseConf.foreach { case ("use:catalog", catalog) => catalogName = catalog case ("use:database", database) => databaseName = database - case _ => // do nothing } val httpClient = new OkHttpClient.Builder().build()