Skip to content

Commit

Permalink
[KYUUBI #5730] Tolerate execeptions for periodical scheduled tasks
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

As the PR #5727 fixed a problem caused by leaking exception in a scheduled task, the exception thrown should be properly handled to prevent suspension in `ScheduledExecutorService.scheduleWithFixedDelay`.

## Describe Your Solution 🔧
Introducing a util method `scheduleTolerableRunnableWithFixedDelay` for catching possible exceptions in scheduled tasks.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️
No behaviour change.

#### Behavior With This Pull Request 🎉
This is a fallback measure to ensure all the exceptions handled.
Currently all the occurrences are properly handled.

#### Related Unit Tests
Pass all the CI tests.

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested

**Be nice. Be informative.**

Closes #5730 from bowenliang123/tolerate-runnable.

Closes #5730

a38eda0 [Bowen Liang] use thread name for error message
6bc13b7 [Bowen Liang] comment
87e2bf2 [Bowen Liang] schedule tolerable runnable

Authored-by: Bowen Liang <liangbowen@gf.com.cn>
Signed-off-by: fwang12 <fwang12@ebay.com>
  • Loading branch information
bowenliang123 authored and turboFei committed Nov 20, 2023
1 parent 24fbd57 commit 8480605
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {

Expand Down Expand Up @@ -167,7 +168,8 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}
lifetimeTerminatingChecker =
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker"))
lifetimeTerminatingChecker.get.scheduleWithFixedDelay(
scheduleTolerableRunnableWithFixedDelay(
lifetimeTerminatingChecker.get,
checkTask,
interval,
interval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
import org.apache.kyuubi.util.ThreadUtils
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

/**
* A [[SessionManager]] constructed with [[SparkSession]] which give it the ability to talk with
Expand Down Expand Up @@ -66,8 +67,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
if (!userIsolatedSparkSession) {
userIsolatedSparkSessionThread =
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("user-isolated-cache-checker"))
userIsolatedSparkSessionThread.foreach {
_.scheduleWithFixedDelay(
userIsolatedSparkSessionThread.foreach { thread =>
scheduleTolerableRunnableWithFixedDelay(
thread,
() => {
userIsolatedCacheLock.synchronized {
val iter = userIsolatedCacheCount.entrySet().iterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.operation.OperationManager
import org.apache.kyuubi.service.CompositeService
import org.apache.kyuubi.util.ThreadUtils
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

/**
* The [[SessionManager]] holds the all the connected [[Session]]s, provides us the APIs to
Expand Down Expand Up @@ -324,7 +325,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
}
}

timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
scheduleTolerableRunnableWithFixedDelay(
timeoutChecker,
checkTask,
interval,
interval,
TimeUnit.MILLISECONDS)
}

private[kyuubi] def startTerminatingChecker(stop: () => Unit): Unit = if (!isServer) {
Expand All @@ -342,7 +348,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
}
}
}
timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
scheduleTolerableRunnableWithFixedDelay(
timeoutChecker,
checkTask,
interval,
interval,
TimeUnit.MILLISECONDS)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.util

import java.util.concurrent.{Executors, ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent._

import scala.concurrent.Awaitable
import scala.concurrent.duration.{Duration, FiniteDuration}
Expand Down Expand Up @@ -109,4 +109,27 @@ object ThreadUtils extends Logging {
thread.setUncaughtExceptionHandler(NamedThreadFactory.kyuubiUncaughtExceptionHandler)
thread.start()
}

/**
* Schedule a runnable to the scheduled executor service.
* The exceptions thrown in the runnable will be caught and logged.
*/
def scheduleTolerableRunnableWithFixedDelay(
scheduler: ScheduledExecutorService,
runnable: Runnable,
initialDelay: Long,
delay: Long,
timeUnit: TimeUnit): Unit = {
scheduler.scheduleWithFixedDelay(
() =>
try {
runnable.run()
} catch {
case t: Throwable =>
error(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
},
initialDelay,
delay,
timeUnit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.service.authentication.PlainSASLHelper
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{ThreadUtils, ThriftUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

class KyuubiSyncThriftClient private (
protocol: TProtocol,
Expand Down Expand Up @@ -125,7 +126,8 @@ class KyuubiSyncThriftClient private (
}
}
engineLastAlive = System.currentTimeMillis()
engineAliveThreadPool.scheduleWithFixedDelay(
scheduleTolerableRunnableWithFixedDelay(
engineAliveThreadPool,
task,
engineAliveProbeInterval,
engineAliveProbeInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
import org.apache.kyuubi.util.reflect.ReflectUtils._

/**
Expand Down Expand Up @@ -299,7 +300,8 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
}

credentialsTimeoutChecker.foreach { executor =>
executor.scheduleWithFixedDelay(
scheduleTolerableRunnableWithFixedDelay(
executor,
checkTask,
credentialsCheckInterval,
credentialsCheckInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service,
import org.apache.kyuubi.service.authentication.{AuthMethods, AuthTypes, KyuubiAuthenticationFactory}
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.util.ThreadUtils
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

/**
* A frontend service based on RESTful api via HTTP protocol.
Expand Down Expand Up @@ -142,7 +143,12 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
}
}

batchChecker.scheduleWithFixedDelay(task, interval, interval, TimeUnit.MILLISECONDS)
scheduleTolerableRunnableWithFixedDelay(
batchChecker,
task,
interval,
interval,
TimeUnit.MILLISECONDS)
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.util.ThreadUtils
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

class PeriodicGCService(name: String) extends AbstractService(name) {
def this() = this(classOf[PeriodicGCService].getSimpleName)
Expand All @@ -40,6 +41,11 @@ class PeriodicGCService(name: String) extends AbstractService(name) {

private def startGcTrigger(): Unit = {
val interval = conf.get(KyuubiConf.SERVER_PERIODIC_GC_INTERVAL)
gcTrigger.scheduleWithFixedDelay(() => System.gc(), interval, interval, TimeUnit.MILLISECONDS)
scheduleTolerableRunnableWithFixedDelay(
gcTrigger,
() => System.gc(),
interval,
interval,
TimeUnit.MILLISECONDS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.session.SessionType
import org.apache.kyuubi.util.{ClassUtils, JdbcUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

class MetadataManager extends AbstractService("MetadataManager") {
import MetadataManager._
Expand Down Expand Up @@ -209,7 +210,8 @@ class MetadataManager extends AbstractService("MetadataManager") {
}
}

metadataCleaner.scheduleWithFixedDelay(
scheduleTolerableRunnableWithFixedDelay(
metadataCleaner,
cleanerTask,
interval,
interval,
Expand Down Expand Up @@ -298,7 +300,9 @@ class MetadataManager extends AbstractService("MetadataManager") {
}
}
}
requestsAsyncRetryTrigger.scheduleWithFixedDelay(

scheduleTolerableRunnableWithFixedDelay(
requestsAsyncRetryTrigger,
triggerTask,
requestsRetryInterval,
requestsRetryInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetry
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.sql.parser.server.KyuubiParser
import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

class KyuubiSessionManager private (name: String) extends SessionManager(name) {

Expand Down Expand Up @@ -409,7 +410,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
}
}
}
engineConnectionAliveChecker.scheduleWithFixedDelay(
scheduleTolerableRunnableWithFixedDelay(
engineConnectionAliveChecker,
checkTask,
interval,
interval,
Expand Down

0 comments on commit 8480605

Please sign in to comment.