Skip to content

Tolerate exceptions for periodical scheduled tasks #5730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {

Expand Down Expand Up @@ -167,7 +168,8 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}
lifetimeTerminatingChecker =
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker"))
lifetimeTerminatingChecker.get.scheduleWithFixedDelay(
scheduleTolerableRunnableWithFixedDelay(
lifetimeTerminatingChecker.get,
checkTask,
interval,
interval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
import org.apache.kyuubi.util.ThreadUtils
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

/**
* A [[SessionManager]] constructed with [[SparkSession]] which give it the ability to talk with
Expand Down Expand Up @@ -66,8 +67,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
if (!userIsolatedSparkSession) {
userIsolatedSparkSessionThread =
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("user-isolated-cache-checker"))
userIsolatedSparkSessionThread.foreach {
_.scheduleWithFixedDelay(
userIsolatedSparkSessionThread.foreach { thread =>
scheduleTolerableRunnableWithFixedDelay(
thread,
() => {
userIsolatedCacheLock.synchronized {
val iter = userIsolatedCacheCount.entrySet().iterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.operation.OperationManager
import org.apache.kyuubi.service.CompositeService
import org.apache.kyuubi.util.ThreadUtils
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

/**
* The [[SessionManager]] holds the all the connected [[Session]]s, provides us the APIs to
Expand Down Expand Up @@ -324,7 +325,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
}
}

timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
scheduleTolerableRunnableWithFixedDelay(
timeoutChecker,
checkTask,
interval,
interval,
TimeUnit.MILLISECONDS)
}

private[kyuubi] def startTerminatingChecker(stop: () => Unit): Unit = if (!isServer) {
Expand All @@ -342,7 +348,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
}
}
}
timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
scheduleTolerableRunnableWithFixedDelay(
timeoutChecker,
checkTask,
interval,
interval,
TimeUnit.MILLISECONDS)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.util

import java.util.concurrent.{Executors, ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent._

import scala.concurrent.Awaitable
import scala.concurrent.duration.{Duration, FiniteDuration}
Expand Down Expand Up @@ -109,4 +109,27 @@ object ThreadUtils extends Logging {
thread.setUncaughtExceptionHandler(NamedThreadFactory.kyuubiUncaughtExceptionHandler)
thread.start()
}

/**
* Schedule a runnable to the scheduled executor service.
* The exceptions thrown in the runnable will be caught and logged.
*/
def scheduleTolerableRunnableWithFixedDelay(
scheduler: ScheduledExecutorService,
runnable: Runnable,
initialDelay: Long,
delay: Long,
timeUnit: TimeUnit): Unit = {
scheduler.scheduleWithFixedDelay(
() =>
try {
runnable.run()
} catch {
case t: Throwable =>
error(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
},
initialDelay,
delay,
timeUnit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.service.authentication.PlainSASLHelper
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{ThreadUtils, ThriftUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

class KyuubiSyncThriftClient private (
protocol: TProtocol,
Expand Down Expand Up @@ -125,7 +126,8 @@ class KyuubiSyncThriftClient private (
}
}
engineLastAlive = System.currentTimeMillis()
engineAliveThreadPool.scheduleWithFixedDelay(
scheduleTolerableRunnableWithFixedDelay(
engineAliveThreadPool,
task,
engineAliveProbeInterval,
engineAliveProbeInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
import org.apache.kyuubi.util.reflect.ReflectUtils._

/**
Expand Down Expand Up @@ -299,7 +300,8 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
}

credentialsTimeoutChecker.foreach { executor =>
executor.scheduleWithFixedDelay(
scheduleTolerableRunnableWithFixedDelay(
executor,
checkTask,
credentialsCheckInterval,
credentialsCheckInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service,
import org.apache.kyuubi.service.authentication.{AuthMethods, AuthTypes, KyuubiAuthenticationFactory}
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.util.ThreadUtils
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

/**
* A frontend service based on RESTful api via HTTP protocol.
Expand Down Expand Up @@ -142,7 +143,12 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
}
}

batchChecker.scheduleWithFixedDelay(task, interval, interval, TimeUnit.MILLISECONDS)
scheduleTolerableRunnableWithFixedDelay(
batchChecker,
task,
interval,
interval,
TimeUnit.MILLISECONDS)
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.util.ThreadUtils
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

class PeriodicGCService(name: String) extends AbstractService(name) {
def this() = this(classOf[PeriodicGCService].getSimpleName)
Expand All @@ -40,6 +41,11 @@ class PeriodicGCService(name: String) extends AbstractService(name) {

private def startGcTrigger(): Unit = {
val interval = conf.get(KyuubiConf.SERVER_PERIODIC_GC_INTERVAL)
gcTrigger.scheduleWithFixedDelay(() => System.gc(), interval, interval, TimeUnit.MILLISECONDS)
scheduleTolerableRunnableWithFixedDelay(
gcTrigger,
() => System.gc(),
interval,
interval,
TimeUnit.MILLISECONDS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.session.SessionType
import org.apache.kyuubi.util.{ClassUtils, JdbcUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

class MetadataManager extends AbstractService("MetadataManager") {
import MetadataManager._
Expand Down Expand Up @@ -209,7 +210,8 @@ class MetadataManager extends AbstractService("MetadataManager") {
}
}

metadataCleaner.scheduleWithFixedDelay(
scheduleTolerableRunnableWithFixedDelay(
metadataCleaner,
cleanerTask,
interval,
interval,
Expand Down Expand Up @@ -298,7 +300,9 @@ class MetadataManager extends AbstractService("MetadataManager") {
}
}
}
requestsAsyncRetryTrigger.scheduleWithFixedDelay(

scheduleTolerableRunnableWithFixedDelay(
requestsAsyncRetryTrigger,
triggerTask,
requestsRetryInterval,
requestsRetryInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetry
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.sql.parser.server.KyuubiParser
import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

class KyuubiSessionManager private (name: String) extends SessionManager(name) {

Expand Down Expand Up @@ -409,7 +410,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
}
}
}
engineConnectionAliveChecker.scheduleWithFixedDelay(
scheduleTolerableRunnableWithFixedDelay(
engineConnectionAliveChecker,
checkTask,
interval,
interval,
Expand Down