diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index 03d9ce42e7f..953cd73025a 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -53,25 +53,34 @@ class FlinkSessionImpl( override def open(): Unit = { executor.openSession(handle.identifier.toString) - normalizedConf.foreach { - case ("use:catalog", catalog) => - val tableEnv = sessionContext.getExecutionContext.getTableEnvironment - try { - tableEnv.useCatalog(catalog) - } catch { - case NonFatal(e) => + + val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) => + Array("use:catalog", "use:database").contains(k) + } + + useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog => + val tableEnv = sessionContext.getExecutionContext.getTableEnvironment + try { + tableEnv.useCatalog(catalog) + } catch { + case NonFatal(e) => + throw e + } + } + + useCatalogAndDatabaseConf.get("use:database").foreach { database => + val tableEnv = sessionContext.getExecutionContext.getTableEnvironment + try { + tableEnv.useDatabase(database) + } catch { + case NonFatal(e) => + if (database != "default") { throw e - } - case ("use:database", database) => - val tableEnv = sessionContext.getExecutionContext.getTableEnvironment - try { - tableEnv.useDatabase(database) - } catch { - case NonFatal(e) => - if (database != "default") { - throw e - } - } + } + } + } + + otherConf.foreach { case (key, value) => setModifiableConfig(key, value) } super.open() diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala index 78164ff5fab..96fc43e857d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala @@ -54,22 +54,31 @@ class SparkSessionImpl( private val sessionEvent = SessionEvent(this) override def open(): Unit = { - normalizedConf.foreach { - case ("use:catalog", catalog) => - try { - SparkCatalogShim().setCurrentCatalog(spark, catalog) - } catch { - case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") => - warn(e.getMessage()) - } - case ("use:database", database) => - try { - SparkCatalogShim().setCurrentDatabase(spark, database) - } catch { - case e - if database == "default" && e.getMessage != null && - e.getMessage.contains("not found") => - } + + val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) => + Array("use:catalog", "use:database").contains(k) + } + + useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog => + try { + SparkCatalogShim().setCurrentCatalog(spark, catalog) + } catch { + case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") => + warn(e.getMessage()) + } + } + + useCatalogAndDatabaseConf.get("use:database").foreach { database => + try { + SparkCatalogShim().setCurrentDatabase(spark, database) + } catch { + case e + if database == "default" && e.getMessage != null && + e.getMessage.contains("not found") => + } + } + + otherConf.foreach { case (key, value) => setModifiableConfig(key, value) } KDFRegistry.registerAll(spark) diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala index a19d74d586c..6bd3b6dae2f 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala @@ -53,10 +53,14 @@ class TrinoSessionImpl( private val sessionEvent = TrinoSessionEvent(this) override def open(): Unit = { - normalizedConf.foreach { + + val (useCatalogAndDatabaseConf, _) = normalizedConf.partition { case (k, _) => + Array("use:catalog", "use:database").contains(k) + } + + useCatalogAndDatabaseConf.foreach { case ("use:catalog", catalog) => catalogName = catalog case ("use:database", database) => databaseName = database - case _ => // do nothing } val httpClient = new OkHttpClient.Builder().build()