Skip to content

Commit

Permalink
use foreach
Browse files Browse the repository at this point in the history
  • Loading branch information
lsm1 committed Apr 3, 2023
1 parent bd83d66 commit e060468
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,26 @@ class FlinkSessionImpl(
Array("use:catalog", "use:database").contains(k)
}

useCatalogAndDatabaseConf.get("use:catalog") match {
case Some(catalog) =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useCatalog(catalog)
} catch {
case NonFatal(e) =>
throw e
}
case None =>
useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useCatalog(catalog)
} catch {
case NonFatal(e) =>
throw e
}
}

useCatalogAndDatabaseConf.get("use:database") match {
case Some(database) =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useDatabase(database)
} catch {
case NonFatal(e) =>
if (database != "default") {
throw e
}
}
case None =>
useCatalogAndDatabaseConf.get("use:database").foreach { database =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useDatabase(database)
} catch {
case NonFatal(e) =>
if (database != "default") {
throw e
}
}
}

otherConf.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,23 @@ class SparkSessionImpl(
Array("use:catalog", "use:database").contains(k)
}

useCatalogAndDatabaseConf.get("use:catalog") match {
case Some(catalog) =>
try {
SparkCatalogShim().setCurrentCatalog(spark, catalog)
} catch {
case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
warn(e.getMessage())
}
case None =>
useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
try {
SparkCatalogShim().setCurrentCatalog(spark, catalog)
} catch {
case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
warn(e.getMessage())
}
}

useCatalogAndDatabaseConf.get("use:database") match {
case Some(database) =>
try {
SparkCatalogShim().setCurrentDatabase(spark, database)
} catch {
case e
if database == "default" && e.getMessage != null &&
e.getMessage.contains("not found") =>
}
case None =>
useCatalogAndDatabaseConf.get("use:database").foreach { database =>
try {
SparkCatalogShim().setCurrentDatabase(spark, database)
} catch {
case e
if database == "default" && e.getMessage != null &&
e.getMessage.contains("not found") =>
}
}

otherConf.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ class TrinoSessionImpl(
private val sessionEvent = TrinoSessionEvent(this)

override def open(): Unit = {
normalizedConf.foreach {

val (useCatalogAndDatabaseConf, _) = normalizedConf.partition { case (k, _) =>
Array("use:catalog", "use:database").contains(k)
}

useCatalogAndDatabaseConf.foreach {
case ("use:catalog", catalog) => catalogName = catalog
case ("use:database", database) => databaseName = database
case _ => // do nothing
}

val httpClient = new OkHttpClient.Builder().build()
Expand Down

0 comments on commit e060468

Please sign in to comment.