Skip to content

Commit

Permalink
check no running operation
Browse files Browse the repository at this point in the history
ut

ut

revert

save

save

debug

Revert "debug"

This reverts commit fe46552.

retry
  • Loading branch information
turboFei committed Feb 4, 2024
1 parent 576379c commit 739ccca
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 69 deletions.
103 changes: 52 additions & 51 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME)
val deregistered = new AtomicBoolean(false)
if (maxLifetime > 0) {
val gracefulPeriod = conf.get(ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD)
val checkTask: Runnable = () => {
if (!shutdown.get && System.currentTimeMillis() - getStartTime > maxLifetime) {
val elapsedTime = System.currentTimeMillis() - getStartTime
if (!shutdown.get && elapsedTime > maxLifetime) {
if (deregistered.compareAndSet(false, true)) {
info(s"Spark engine has been running for more than $maxLifetime ms," +
s" deregistering from engine discovery space.")
Expand All @@ -182,6 +184,24 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
info(s"Spark engine has been running for more than $maxLifetime ms" +
s" and no open session now, terminating.")
stop()
} else if (gracefulPeriod > 0 && elapsedTime > maxLifetime + gracefulPeriod) {
backendService.sessionManager.allSessions().foreach { session =>
val operationCount =
backendService.sessionManager.operationManager.allOperations()
.filter(_.getSession == session)
.size
if (operationCount == 0) {
warn(s"Closing session ${session.handle.identifier} forcibly that has no" +
s" operation and has been running for more than $gracefulPeriod ms after engine" +
s" max lifetime.")
try {
backendService.sessionManager.closeSession(session.handle)
} catch {
case e: Throwable =>
error(s"Error closing session ${session.handle.identifier}", e)
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@

package org.apache.kyuubi.engine.spark

import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel

trait EtcdShareLevelSparkEngineSuite
extends ShareLevelSparkEngineTests with WithEtcdCluster {
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++
etcdConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
super.withKyuubiConf ++ etcdConf
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime

import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME, ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
import org.apache.kyuubi.operation.HiveJDBCTestHelper
Expand All @@ -35,6 +36,13 @@ trait ShareLevelSparkEngineTests
extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper {
def shareLevel: ShareLevel

override def withKyuubiConf: Map[String, String] = super.withKyuubiConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT5s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT2s",
ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD.key -> "100")

override protected def jdbcUrl: String = getJdbcUrl
override val namespace: String = {
// for test, we always use uuid as namespace
Expand Down Expand Up @@ -76,4 +84,23 @@ trait ShareLevelSparkEngineTests
}
}
}

test("test spark engine max life-time with graceful period") {
withDiscoveryClient { discoveryClient =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(discoveryClient.pathExists(namespace))
withJdbcStatement() { _ =>
eventually(Timeout(30.seconds)) {
shareLevel match {
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(discoveryClient.pathNonExists(namespace))
case _ =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(discoveryClient.pathExists(namespace))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,13 @@

package org.apache.kyuubi.engine.spark

import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel

trait ZookeeperShareLevelSparkEngineSuite
extends ShareLevelSparkEngineTests with WithEmbeddedZookeeper {
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++
zookeeperConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
super.withKyuubiConf ++ zookeeperConf
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,16 @@ object KyuubiConf {
.timeConf
.createWithDefault(0)

val ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.lifetime.gracefulPeriod")
.doc("Graceful period for Spark engine to wait the connections disconnected after reaching" +
" the end of life. After the graceful period, all the connections without running" +
" operations will be forcibly disconnected. 0 or negative means always waiting the" +
" connections disconnected.")
.version("1.8.1")
.timeConf
.createWithDefault(0)

val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.initial.wait")
.doc("Max wait time for the initial connection to Spark engine. The engine will" +
Expand Down

0 comments on commit 739ccca

Please sign in to comment.