diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala index b9fb9325999..2e33d8ce6db 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.util.kvstore.KVIndex import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.ConfigEntry import org.apache.kyuubi.util.SemanticVersion object KyuubiSparkUtil extends Logging { @@ -98,4 +99,18 @@ object KyuubiSparkUtil extends Logging { // Given that we are on the Spark SQL engine side, the [[org.apache.spark.SPARK_VERSION]] can be // represented as the runtime version of the Spark SQL engine. lazy val SPARK_ENGINE_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION) + + /** + * Get session level config value + * @param configEntry configEntry + * @param spark sparkSession + * @tparam T any type + * @return session level config value, if spark not set this config, + * default return kyuubi's config + */ + def getSessionConf[T](configEntry: ConfigEntry[T], spark: SparkSession): T = { + spark.conf.getOption(configEntry.key).map(configEntry.valueConverter).getOrElse { + SparkSQLEngine.kyuubiConf.get(configEntry) + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index 4e4a940d295..b51a0dd3183 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -28,7 +28,7 @@ import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL} import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SQL_EXECUTION_ID_KEY +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY} import org.apache.kyuubi.engine.spark.operation.ExecuteStatement import org.apache.kyuubi.operation.Operation import org.apache.kyuubi.operation.log.OperationLog @@ -48,14 +48,13 @@ class SQLOperationListener( private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]() private var executionId: Option[Long] = None - private val conf: KyuubiConf = operation.getSession.sessionManager.getConf private lazy val consoleProgressBar = - if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) { + if (getSessionConf(ENGINE_SPARK_SHOW_PROGRESS, spark)) { Some(new SparkConsoleProgressBar( operation, activeStages, - conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL), - conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT))) + getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL, spark), + getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, spark))) } else { None }