Skip to content

Commit deb9221

Browse files
bowenliang123turboFei
authored andcommitted
[KYUUBI #5730] Tolerate execeptions for periodical scheduled tasks
# 🔍 Description ## Issue References 🔗 As the PR #5727 fixed a problem caused by leaking exception in a scheduled task, the exception thrown should be properly handled to prevent suspension in `ScheduledExecutorService.scheduleWithFixedDelay`. ## Describe Your Solution 🔧 Introducing a util method `scheduleTolerableRunnableWithFixedDelay` for catching possible exceptions in scheduled tasks. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ No behaviour change. #### Behavior With This Pull Request 🎉 This is a fallback measure to ensure all the exceptions handled. Currently all the occurrences are properly handled. #### Related Unit Tests Pass all the CI tests. --- # Checklists ## 📝 Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## 📝 Committer Pre-Merge Checklist - [ ] Pull request title is okay. - [ ] No license issues. - [ ] Milestone correctly set? - [ ] Test coverage is ok - [ ] Assignees are selected. - [ ] Minimum number of approvals - [ ] No changes are requested **Be nice. Be informative.** Closes #5730 from bowenliang123/tolerate-runnable. Closes #5730 a38eda0 [Bowen Liang] use thread name for error message 6bc13b7 [Bowen Liang] comment 87e2bf2 [Bowen Liang] schedule tolerable runnable Authored-by: Bowen Liang <liangbowen@gf.com.cn> Signed-off-by: fwang12 <fwang12@ebay.com> (cherry picked from commit 8480605) Signed-off-by: fwang12 <fwang12@ebay.com>
1 parent 93727f9 commit deb9221

File tree

10 files changed

+73
-13
lines changed

10 files changed

+73
-13
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.kyuubi.ha.client.RetryPolicies
4646
import org.apache.kyuubi.service.Serverable
4747
import org.apache.kyuubi.session.SessionHandle
4848
import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
49+
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
4950

5051
case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
5152

@@ -167,7 +168,8 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
167168
}
168169
lifetimeTerminatingChecker =
169170
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker"))
170-
lifetimeTerminatingChecker.get.scheduleWithFixedDelay(
171+
scheduleTolerableRunnableWithFixedDelay(
172+
lifetimeTerminatingChecker.get,
171173
checkTask,
172174
interval,
173175
interval,

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
3232
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
3333
import org.apache.kyuubi.session._
3434
import org.apache.kyuubi.util.ThreadUtils
35+
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
3536

3637
/**
3738
* A [[SessionManager]] constructed with [[SparkSession]] which give it the ability to talk with
@@ -66,8 +67,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
6667
if (!userIsolatedSparkSession) {
6768
userIsolatedSparkSessionThread =
6869
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("user-isolated-cache-checker"))
69-
userIsolatedSparkSessionThread.foreach {
70-
_.scheduleWithFixedDelay(
70+
userIsolatedSparkSessionThread.foreach { thread =>
71+
scheduleTolerableRunnableWithFixedDelay(
72+
thread,
7173
() => {
7274
userIsolatedCacheLock.synchronized {
7375
val iter = userIsolatedCacheCount.entrySet().iterator()

kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf._
3333
import org.apache.kyuubi.operation.OperationManager
3434
import org.apache.kyuubi.service.CompositeService
3535
import org.apache.kyuubi.util.ThreadUtils
36+
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
3637

3738
/**
3839
* The [[SessionManager]] holds the all the connected [[Session]]s, provides us the APIs to
@@ -324,7 +325,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
324325
}
325326
}
326327

327-
timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
328+
scheduleTolerableRunnableWithFixedDelay(
329+
timeoutChecker,
330+
checkTask,
331+
interval,
332+
interval,
333+
TimeUnit.MILLISECONDS)
328334
}
329335

330336
private[kyuubi] def startTerminatingChecker(stop: () => Unit): Unit = if (!isServer) {
@@ -342,7 +348,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
342348
}
343349
}
344350
}
345-
timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
351+
scheduleTolerableRunnableWithFixedDelay(
352+
timeoutChecker,
353+
checkTask,
354+
interval,
355+
interval,
356+
TimeUnit.MILLISECONDS)
346357
}
347358
}
348359
}

kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.kyuubi.util
1919

20-
import java.util.concurrent.{Executors, ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
20+
import java.util.concurrent._
2121

2222
import scala.concurrent.Awaitable
2323
import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -109,4 +109,27 @@ object ThreadUtils extends Logging {
109109
thread.setUncaughtExceptionHandler(NamedThreadFactory.kyuubiUncaughtExceptionHandler)
110110
thread.start()
111111
}
112+
113+
/**
114+
* Schedule a runnable to the scheduled executor service.
115+
* The exceptions thrown in the runnable will be caught and logged.
116+
*/
117+
def scheduleTolerableRunnableWithFixedDelay(
118+
scheduler: ScheduledExecutorService,
119+
runnable: Runnable,
120+
initialDelay: Long,
121+
delay: Long,
122+
timeUnit: TimeUnit): Unit = {
123+
scheduler.scheduleWithFixedDelay(
124+
() =>
125+
try {
126+
runnable.run()
127+
} catch {
128+
case t: Throwable =>
129+
error(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
130+
},
131+
initialDelay,
132+
delay,
133+
timeUnit)
134+
}
112135
}

kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
3939
import org.apache.kyuubi.service.authentication.PlainSASLHelper
4040
import org.apache.kyuubi.session.SessionHandle
4141
import org.apache.kyuubi.util.{ThreadUtils, ThriftUtils}
42+
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
4243

4344
class KyuubiSyncThriftClient private (
4445
protocol: TProtocol,
@@ -125,7 +126,8 @@ class KyuubiSyncThriftClient private (
125126
}
126127
}
127128
engineLastAlive = System.currentTimeMillis()
128-
engineAliveThreadPool.scheduleWithFixedDelay(
129+
scheduleTolerableRunnableWithFixedDelay(
130+
engineAliveThreadPool,
129131
task,
130132
engineAliveProbeInterval,
131133
engineAliveProbeInterval,

kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
3333
import org.apache.kyuubi.config.KyuubiConf._
3434
import org.apache.kyuubi.service.AbstractService
3535
import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
36+
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
3637
import org.apache.kyuubi.util.reflect.ReflectUtils._
3738

3839
/**
@@ -299,7 +300,8 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
299300
}
300301

301302
credentialsTimeoutChecker.foreach { executor =>
302-
executor.scheduleWithFixedDelay(
303+
scheduleTolerableRunnableWithFixedDelay(
304+
executor,
303305
checkTask,
304306
credentialsCheckInterval,
305307
credentialsCheckInterval,

kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service,
3838
import org.apache.kyuubi.service.authentication.{AuthMethods, AuthTypes, KyuubiAuthenticationFactory}
3939
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionHandle}
4040
import org.apache.kyuubi.util.ThreadUtils
41+
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
4142

4243
/**
4344
* A frontend service based on RESTful api via HTTP protocol.
@@ -142,7 +143,12 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
142143
}
143144
}
144145

145-
batchChecker.scheduleWithFixedDelay(task, interval, interval, TimeUnit.MILLISECONDS)
146+
scheduleTolerableRunnableWithFixedDelay(
147+
batchChecker,
148+
task,
149+
interval,
150+
interval,
151+
TimeUnit.MILLISECONDS)
146152
}
147153

148154
@VisibleForTesting

kyuubi-server/src/main/scala/org/apache/kyuubi/server/PeriodicGCService.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
2222
import org.apache.kyuubi.config.KyuubiConf
2323
import org.apache.kyuubi.service.AbstractService
2424
import org.apache.kyuubi.util.ThreadUtils
25+
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
2526

2627
class PeriodicGCService(name: String) extends AbstractService(name) {
2728
def this() = this(classOf[PeriodicGCService].getSimpleName)
@@ -40,6 +41,11 @@ class PeriodicGCService(name: String) extends AbstractService(name) {
4041

4142
private def startGcTrigger(): Unit = {
4243
val interval = conf.get(KyuubiConf.SERVER_PERIODIC_GC_INTERVAL)
43-
gcTrigger.scheduleWithFixedDelay(() => System.gc(), interval, interval, TimeUnit.MILLISECONDS)
44+
scheduleTolerableRunnableWithFixedDelay(
45+
gcTrigger,
46+
() => System.gc(),
47+
interval,
48+
interval,
49+
TimeUnit.MILLISECONDS)
4450
}
4551
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
3232
import org.apache.kyuubi.service.AbstractService
3333
import org.apache.kyuubi.session.SessionType
3434
import org.apache.kyuubi.util.{ClassUtils, JdbcUtils, ThreadUtils}
35+
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
3536

3637
class MetadataManager extends AbstractService("MetadataManager") {
3738
import MetadataManager._
@@ -209,7 +210,8 @@ class MetadataManager extends AbstractService("MetadataManager") {
209210
}
210211
}
211212

212-
metadataCleaner.scheduleWithFixedDelay(
213+
scheduleTolerableRunnableWithFixedDelay(
214+
metadataCleaner,
213215
cleanerTask,
214216
interval,
215217
interval,
@@ -298,7 +300,9 @@ class MetadataManager extends AbstractService("MetadataManager") {
298300
}
299301
}
300302
}
301-
requestsAsyncRetryTrigger.scheduleWithFixedDelay(
303+
304+
scheduleTolerableRunnableWithFixedDelay(
305+
requestsAsyncRetryTrigger,
302306
triggerTask,
303307
requestsRetryInterval,
304308
requestsRetryInterval,

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetry
4141
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
4242
import org.apache.kyuubi.sql.parser.server.KyuubiParser
4343
import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
44+
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
4445

4546
class KyuubiSessionManager private (name: String) extends SessionManager(name) {
4647

@@ -409,7 +410,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
409410
}
410411
}
411412
}
412-
engineConnectionAliveChecker.scheduleWithFixedDelay(
413+
scheduleTolerableRunnableWithFixedDelay(
414+
engineConnectionAliveChecker,
413415
checkTask,
414416
interval,
415417
interval,

0 commit comments

Comments
 (0)