Skip to content

Commit

Permalink
add common method to get session level config
Browse files Browse the repository at this point in the history
  • Loading branch information
davidyuan1223 committed Oct 20, 2023
1 parent c4cdf18 commit 03e2887
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.util.kvstore.KVIndex

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.ConfigEntry
import org.apache.kyuubi.util.SemanticVersion

object KyuubiSparkUtil extends Logging {
Expand Down Expand Up @@ -98,4 +99,18 @@ object KyuubiSparkUtil extends Logging {
// Given that we are on the Spark SQL engine side, the [[org.apache.spark.SPARK_VERSION]] can be
// represented as the runtime version of the Spark SQL engine.
lazy val SPARK_ENGINE_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION)

/**
* Get session level config value
* @param configEntry configEntry
* @param spark sparkSession
* @tparam T any type
* @return session level config value, if spark not set this config,
* default return kyuubi's config
*/
def getSessionConf[T](configEntry: ConfigEntry[T], spark: SparkSession): T = {
spark.conf.getOption(configEntry.key).map(configEntry.valueConverter).getOrElse {
SparkSQLEngine.kyuubiConf.get(configEntry)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SQL_EXECUTION_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY}
import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.log.OperationLog
Expand All @@ -48,14 +48,13 @@ class SQLOperationListener(
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
private var executionId: Option[Long] = None

private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
private lazy val consoleProgressBar =
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
if (getSessionConf(ENGINE_SPARK_SHOW_PROGRESS, spark)) {
Some(new SparkConsoleProgressBar(
operation,
activeStages,
conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL, spark),
getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, spark)))
} else {
None
}
Expand Down

0 comments on commit 03e2887

Please sign in to comment.