Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Feb 6, 2024
1 parent 4329a85 commit 0b05ddc
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
info(s"Spark engine is de-registering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
while (backendService.sessionManager.getOpenSessionCount > 0) {
Thread.sleep(TimeUnit.SECONDS.toMillis(5))
Thread.sleep(TimeUnit.SECONDS.toMillis(10))
}
info(s"Spark engine has no open session now, terminating.")
stop()
Expand Down Expand Up @@ -175,12 +175,9 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
val elapsedTime = System.currentTimeMillis() - getStartTime
if (!shutdown.get && elapsedTime > maxLifetime) {
if (deregistered.compareAndSet(false, true)) {
ThreadUtils.runInNewThread("engine-de-register", isDaemon = false) {
// for ETCD, the de-registering process might be blocked, so deregister it async
info(s"Spark engine has been running for more than $maxLifetime ms," +
s" deregistering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
}
info(s"Spark engine has been running for more than $maxLifetime ms," +
s" deregistering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
}

if (backendService.sessionManager.getOpenSessionCount <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ abstract class ServiceDiscovery(
def stopGracefully(isLost: Boolean = false): Unit = {
while (fe.be.sessionManager.getOpenSessionCount > 0) {
info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown")
Thread.sleep(TimeUnit.SECONDS.toMillis(5))
Thread.sleep(TimeUnit.SECONDS.toMillis(10))
}
isServerLost.set(isLost)
gracefulShutdownLatch.countDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.apache.kyuubi.ha.client.DiscoveryPaths
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.ha.client.ServiceNodeInfo
import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient._
import org.apache.kyuubi.util.ThreadUtils

class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {

Expand Down Expand Up @@ -381,7 +382,12 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
.filter(_.getEventType == WatchEvent.EventType.DELETE).foreach(_ => {
warn(s"This Kyuubi instance ${instance} is now de-registered from" +
s" ETCD. The server will be shut down after the last client session completes.")
serviceDiscovery.stopGracefully()
// for jetcd, the watcher event process might block the main thread,
// so start a new thread to do the de-register work as a workaround,
// see details in https://github.com/etcd-io/jetcd/issues/1089
ThreadUtils.runInNewThread("deregister-watcher-thread", isDaemon = false) {
serviceDiscovery.stopGracefully()
}
})
}

Expand Down

0 comments on commit 0b05ddc

Please sign in to comment.