Skip to content

Commit

Permalink
avoid sort
Browse files Browse the repository at this point in the history
  • Loading branch information
lsm1 committed Apr 3, 2023
1 parent eda34d4 commit 4d8445a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,20 @@ class FlinkSessionImpl(

override def open(): Unit = {
executor.openSession(handle.identifier.toString)
normalizedConf.toSeq.sortBy(!_._1.equals("use:catalog")).foreach {
case ("use:catalog", catalog) =>
normalizedConf.get("use:catalog") match {
case Some(catalog) =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useCatalog(catalog)
} catch {
case NonFatal(e) =>
throw e
}
case ("use:database", database) =>
case None =>
}

normalizedConf.get("use:database") match {
case Some(database) =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useDatabase(database)
Expand All @@ -76,6 +80,9 @@ class FlinkSessionImpl(
throw e
}
}
case None =>
}
normalizedConf.filterKeys(key => !Set("use:catalog", "use:database").contains(key)).foreach {
case (key, value) => setModifiableConfig(key, value)
}
super.open()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,30 @@ class SparkSessionImpl(
private val sessionEvent = SessionEvent(this)

override def open(): Unit = {
normalizedConf.toSeq.sortBy(!_._1.equals("use:catalog")).foreach {
case ("use:catalog", catalog) =>
normalizedConf.get("use:catalog") match {
case Some(catalog) =>
try {
SparkCatalogShim().setCurrentCatalog(spark, catalog)
} catch {
case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
warn(e.getMessage())
}
case ("use:database", database) =>
case None =>
}

normalizedConf.get("use:database") match {
case Some(database) =>
try {
SparkCatalogShim().setCurrentDatabase(spark, database)
} catch {
case e
if database == "default" && e.getMessage != null &&
e.getMessage.contains("not found") =>
}
case None =>
}

normalizedConf.filterKeys(key => !Set("use:catalog", "use:database").contains(key)).foreach {
case (key, value) => setModifiableConfig(key, value)
}
KDFRegistry.registerAll(spark)
Expand Down

0 comments on commit 4d8445a

Please sign in to comment.