diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 6bb9a148884..d1331cd0284 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -129,7 +129,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin info(s"Spark engine is de-registering from engine discovery space.") frontendServices.flatMap(_.discoveryService).foreach(_.stop()) while (backendService.sessionManager.getOpenSessionCount > 0) { - Thread.sleep(TimeUnit.SECONDS.toMillis(5)) + Thread.sleep(TimeUnit.SECONDS.toMillis(10)) } info(s"Spark engine has no open session now, terminating.") stop() @@ -175,12 +175,9 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin val elapsedTime = System.currentTimeMillis() - getStartTime if (!shutdown.get && elapsedTime > maxLifetime) { if (deregistered.compareAndSet(false, true)) { - ThreadUtils.runInNewThread("engine-de-register", isDaemon = false) { - // for ETCD, the de-registering process might be blocked, so deregister it async - info(s"Spark engine has been running for more than $maxLifetime ms," + - s" deregistering from engine discovery space.") - frontendServices.flatMap(_.discoveryService).foreach(_.stop()) - } + info(s"Spark engine has been running for more than $maxLifetime ms," + + s" deregistering from engine discovery space.") + frontendServices.flatMap(_.discoveryService).foreach(_.stop()) } if (backendService.sessionManager.getOpenSessionCount <= 0) { diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index a3f7d282249..c7eee15030f 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -68,7 +68,7 @@ abstract class ServiceDiscovery( def stopGracefully(isLost: Boolean = false): Unit = { while (fe.be.sessionManager.getOpenSessionCount > 0) { info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown") - Thread.sleep(TimeUnit.SECONDS.toMillis(5)) + Thread.sleep(TimeUnit.SECONDS.toMillis(10)) } isServerLost.set(isLost) gracefulShutdownLatch.countDown() diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala index 7edc7e8a310..2f27f7b8096 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala @@ -53,6 +53,7 @@ import org.apache.kyuubi.ha.client.DiscoveryPaths import org.apache.kyuubi.ha.client.ServiceDiscovery import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient._ +import org.apache.kyuubi.util.ThreadUtils class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { @@ -381,7 +382,12 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { .filter(_.getEventType == WatchEvent.EventType.DELETE).foreach(_ => { warn(s"This Kyuubi instance ${instance} is now de-registered from" + s" ETCD. The server will be shut down after the last client session completes.") - serviceDiscovery.stopGracefully() + // for jetcd, the watcher event process might block the main thread, + // so start a new thread to do the de-register work as a workaround, + // see details in https://github.com/etcd-io/jetcd/issues/1089 + ThreadUtils.runInNewThread("deregister-watcher-thread", isDaemon = false) { + serviceDiscovery.stopGracefully() + } }) }