From b39c9b3a074222a147fdecd4a2499b70c7ed8d68 Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Sat, 3 Feb 2024 21:16:52 -0800 Subject: [PATCH] use short sleep --- .../org/apache/kyuubi/engine/spark/SparkSQLEngine.scala | 2 +- .../org/apache/kyuubi/ha/client/ServiceDiscovery.scala | 4 ++-- .../apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala | 7 +------ 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index a69ed539b07..1339c7193a1 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -129,7 +129,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin info(s"Spark engine is de-registering from engine discovery space.") frontendServices.flatMap(_.discoveryService).foreach(_.stop()) while (backendService.sessionManager.getOpenSessionCount > 0) { - Thread.sleep(1000 * 60) + Thread.sleep(TimeUnit.SECONDS.toMillis(5)) } info(s"Spark engine has no open session now, terminating.") stop() diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index a1b1466d122..a3f7d282249 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.ha.client -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import org.apache.kyuubi.Logging @@ -68,7 +68,7 @@ abstract class ServiceDiscovery( def stopGracefully(isLost: Boolean = false): Unit = { while (fe.be.sessionManager.getOpenSessionCount > 0) { info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown") - Thread.sleep(1000 * 60) + Thread.sleep(TimeUnit.SECONDS.toMillis(5)) } isServerLost.set(isLost) gracefulShutdownLatch.countDown() diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala index c1ed7c1bb2a..7edc7e8a310 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala @@ -63,7 +63,6 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { var lockClient: Lock = _ var leaseClient: Lease = _ var serviceNode: ServiceNode = _ - var serviceDiscovery: ServiceDiscovery = _ var leaseTTL: Long = _ @@ -158,7 +157,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { } override def monitorState(serviceDiscovery: ServiceDiscovery): Unit = { - this.serviceDiscovery = serviceDiscovery + // not need with etcd } override def tryWithLock[T]( @@ -277,10 +276,6 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { } serviceNode = null } - if (serviceDiscovery != null) { - serviceDiscovery.stopGracefully(true) - serviceDiscovery = null - } } override def postDeregisterService(namespace: String): Boolean = {