diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala index 38527cbf1f8..bb6e8a8a373 100644 --- a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala @@ -62,7 +62,7 @@ abstract class ChatOperation(session: Session) extends AbstractOperation(session // We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError` // could be thrown. case e: Throwable => - state.synchronized { + withLockRequired { val errMsg = Utils.stringifyException(e) if (state == OperationState.TIMEOUT) { val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg") diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala index d734cea0550..eee2fdc9843 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala @@ -57,7 +57,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio } override protected def afterRun(): Unit = { - state.synchronized { + withLockRequired { if (!isTerminalState(state)) { setState(OperationState.FINISHED) } @@ -114,7 +114,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio // We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError` // could be thrown. case e: Throwable => - state.synchronized { + withLockRequired { val errMsg = Utils.stringifyException(e) if (state == OperationState.TIMEOUT) { val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg") diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala index 81affdff3a3..c025697840e 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala @@ -46,7 +46,7 @@ abstract class HiveOperation(session: Session) extends AbstractOperation(session } override def afterRun(): Unit = { - state.synchronized { + withLockRequired { if (!isTerminalState(state)) { setState(OperationState.FINISHED) } diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala index 6cac42f49ef..f4d1c27e771 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala @@ -66,7 +66,7 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session // We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError` // could be thrown. case e: Throwable => - state.synchronized { + withLockRequired { val errMsg = Utils.stringifyException(e) if (state == OperationState.TIMEOUT) { val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index d2627fd99fd..17cc967e6f1 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -180,12 +180,7 @@ case class SessionPythonWorker( new BufferedReader(new InputStreamReader(workerProcess.getInputStream), 1) private val lock = new ReentrantLock() - private def withLockRequired[T](block: => T): T = { - try { - lock.lock() - block - } finally lock.unlock() - } + private def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block) /** * Run the python code and return the response. This method maybe invoked internally, diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index cb7510a890b..6b2a5d9eb93 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -101,7 +101,7 @@ abstract class SparkOperation(session: Session) super.getStatus } - override def cleanup(targetState: OperationState): Unit = state.synchronized { + override def cleanup(targetState: OperationState): Unit = withLockRequired { operationListener.foreach(_.cleanup()) if (!isTerminalState(state)) { setState(targetState) @@ -174,7 +174,7 @@ abstract class SparkOperation(session: Session) // could be thrown. case e: Throwable => if (cancel && !spark.sparkContext.isStopped) spark.sparkContext.cancelJobGroup(statementId) - state.synchronized { + withLockRequired { val errMsg = Utils.stringifyException(e) if (state == OperationState.TIMEOUT) { val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg") @@ -201,7 +201,7 @@ abstract class SparkOperation(session: Session) } override protected def afterRun(): Unit = { - state.synchronized { + withLockRequired { if (!isTerminalState(state)) { setState(OperationState.FINISHED) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala index 27090fae4af..a5437df9244 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala @@ -29,6 +29,8 @@ import org.apache.spark.repl.SparkILoop import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.util.MutableURLClassLoader +import org.apache.kyuubi.Utils + private[spark] case class KyuubiSparkILoop private ( spark: SparkSession, output: ByteArrayOutputStream) @@ -124,10 +126,5 @@ private[spark] object KyuubiSparkILoop { } private val lock = new ReentrantLock() - private def withLockRequired[T](block: => T): T = { - try { - lock.lock() - block - } finally lock.unlock() - } + private def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block) } diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala index 6e40f65f290..bff0586054d 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala @@ -75,7 +75,7 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio } override protected def afterRun(): Unit = { - state.synchronized { + withLockRequired { if (!isTerminalState(state)) { setState(OperationState.FINISHED) } @@ -108,7 +108,7 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio // could be thrown. case e: Throwable => if (cancel && trino.isRunning) trino.cancelLeafStage() - state.synchronized { + withLockRequired { val errMsg = Utils.stringifyException(e) if (state == OperationState.TIMEOUT) { val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg") diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index 3a03682ff1b..06c572130e7 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -24,6 +24,7 @@ import java.nio.file.{Files, Path, Paths, StandardCopyOption} import java.text.SimpleDateFormat import java.util.{Date, Properties, TimeZone, UUID} import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.Lock import scala.collection.JavaConverters._ import scala.sys.process._ @@ -407,4 +408,13 @@ object Utils extends Logging { stringWriter.toString } } + + def withLockRequired[T](lock: Lock)(block: => T): T = { + try { + lock.lock() + block + } finally { + lock.unlock() + } + } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index d50cb8e243f..2e52757a238 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -18,13 +18,14 @@ package org.apache.kyuubi.operation import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import org.apache.commons.lang3.StringUtils import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TRowSet, TStatus, TStatusCode} -import org.apache.kyuubi.{KyuubiSQLException, Logging} +import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.operation.OperationState._ @@ -45,7 +46,11 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None - protected def cleanup(targetState: OperationState): Unit = state.synchronized { + private val lock: ReentrantLock = new ReentrantLock() + + protected def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block) + + protected def cleanup(targetState: OperationState): Unit = withLockRequired { if (!isTerminalState(state)) { setState(targetState) Option(getBackgroundHandle).foreach(_.cancel(true)) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index 8b8561fa99f..587fd575621 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -136,14 +136,11 @@ class KyuubiSyncThriftClient private ( /** * Lock every rpc call to send them sequentially */ - private def withLockAcquired[T](block: => T): T = { - try { - lock.lock() - if (!protocol.getTransport.isOpen) { - throw KyuubiSQLException.connectionDoesNotExist() - } - block - } finally lock.unlock() + private def withLockAcquired[T](block: => T): T = Utils.withLockRequired(lock) { + if (!protocol.getTransport.isOpen) { + throw KyuubiSQLException.connectionDoesNotExist() + } + block } private def withLockAcquiredAsyncRequest[T](block: => T): T = withLockAcquired { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index a723ab4b034..e6433cdc9a9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -165,7 +165,7 @@ class BatchJobSubmission( override def getOperationLog: Option[OperationLog] = Option(_operationLog) // we can not set to other state if it is canceled - private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized { + private def setStateIfNotCanceled(newState: OperationState): Unit = withLockRequired { if (state != CANCELED) { setState(newState) applicationId(_applicationInfo).foreach { appId => @@ -318,7 +318,7 @@ class BatchJobSubmission( } } - override def close(): Unit = state.synchronized { + override def close(): Unit = withLockRequired { if (!isClosedOrCanceled) { try { getOperationLog.foreach(_.close()) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala index 106a11e4b25..e0475394e1e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -59,7 +59,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi protected def onError(action: String = "operating"): PartialFunction[Throwable, Unit] = { case e: Throwable => - state.synchronized { + withLockRequired { if (isTerminalState(state)) { warn(s"Ignore exception in terminal state with $statementId", e) } else { @@ -101,14 +101,14 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi } override protected def afterRun(): Unit = { - state.synchronized { + withLockRequired { if (!isTerminalState(state)) { setState(OperationState.FINISHED) } } } - override def cancel(): Unit = state.synchronized { + override def cancel(): Unit = withLockRequired { if (!isClosedOrCanceled) { setState(OperationState.CANCELED) MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType))) @@ -123,7 +123,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi } } - override def close(): Unit = state.synchronized { + override def close(): Unit = withLockRequired { if (!isClosedOrCanceled) { setState(OperationState.CLOSED) MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))