Skip to content

Commit

Permalink
[KYUUBI apache#4739] Add operation lock instead of locking state Enum…
Browse files Browse the repository at this point in the history
…eration

### _Why are the changes needed?_

We meet an issue that cause all the operation stuck when closing operation.

Because now all the operations try to lock a Scala Enumeration val.

And if one of them stuck, all the others will be keep stuck.

In this pr, I add a lock for each operation.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes apache#4739 from turboFei/op_lock.

Closes apache#4739

535400a [fwang12] revert
a934389 [fwang12] lockInterruptibly
274abc9 [fwang12] utils
ceda731 [fwang12] op lock

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
  • Loading branch information
turboFei committed Apr 21, 2023
1 parent 6876f82 commit ccacb33
Show file tree
Hide file tree
Showing 13 changed files with 42 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ abstract class ChatOperation(session: Session) extends AbstractOperation(session
// We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
// could be thrown.
case e: Throwable =>
state.synchronized {
withLockRequired {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
}

override protected def afterRun(): Unit = {
state.synchronized {
withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
Expand Down Expand Up @@ -114,7 +114,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
// We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
// could be thrown.
case e: Throwable =>
state.synchronized {
withLockRequired {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class HiveOperation(session: Session) extends AbstractOperation(session
}

override def afterRun(): Unit = {
state.synchronized {
withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
// We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
// could be thrown.
case e: Throwable =>
state.synchronized {
withLockRequired {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,7 @@ case class SessionPythonWorker(
new BufferedReader(new InputStreamReader(workerProcess.getInputStream), 1)
private val lock = new ReentrantLock()

private def withLockRequired[T](block: => T): T = {
try {
lock.lock()
block
} finally lock.unlock()
}
private def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)

/**
* Run the python code and return the response. This method maybe invoked internally,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ abstract class SparkOperation(session: Session)
super.getStatus
}

override def cleanup(targetState: OperationState): Unit = state.synchronized {
override def cleanup(targetState: OperationState): Unit = withLockRequired {
operationListener.foreach(_.cleanup())
if (!isTerminalState(state)) {
setState(targetState)
Expand Down Expand Up @@ -174,7 +174,7 @@ abstract class SparkOperation(session: Session)
// could be thrown.
case e: Throwable =>
if (cancel && !spark.sparkContext.isStopped) spark.sparkContext.cancelJobGroup(statementId)
state.synchronized {
withLockRequired {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
Expand All @@ -201,7 +201,7 @@ abstract class SparkOperation(session: Session)
}

override protected def afterRun(): Unit = {
state.synchronized {
withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import org.apache.spark.repl.SparkILoop
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.util.MutableURLClassLoader

import org.apache.kyuubi.Utils

private[spark] case class KyuubiSparkILoop private (
spark: SparkSession,
output: ByteArrayOutputStream)
Expand Down Expand Up @@ -124,10 +126,5 @@ private[spark] object KyuubiSparkILoop {
}

private val lock = new ReentrantLock()
private def withLockRequired[T](block: => T): T = {
try {
lock.lock()
block
} finally lock.unlock()
}
private def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
}

override protected def afterRun(): Unit = {
state.synchronized {
withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
Expand Down Expand Up @@ -108,7 +108,7 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
// could be thrown.
case e: Throwable =>
if (cancel && trino.isRunning) trino.cancelLeafStage()
state.synchronized {
withLockRequired {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
Expand Down
10 changes: 10 additions & 0 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.text.SimpleDateFormat
import java.util.{Date, Properties, TimeZone, UUID}
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.Lock

import scala.collection.JavaConverters._
import scala.sys.process._
Expand Down Expand Up @@ -407,4 +408,13 @@ object Utils extends Logging {
stringWriter.toString
}
}

def withLockRequired[T](lock: Lock)(block: => T): T = {
try {
lock.lock()
block
} finally {
lock.unlock()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.kyuubi.operation

import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.locks.ReentrantLock

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TRowSet, TStatus, TStatusCode}

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState._
Expand All @@ -45,7 +46,11 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin

private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None

protected def cleanup(targetState: OperationState): Unit = state.synchronized {
private val lock: ReentrantLock = new ReentrantLock()

protected def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)

protected def cleanup(targetState: OperationState): Unit = withLockRequired {
if (!isTerminalState(state)) {
setState(targetState)
Option(getBackgroundHandle).foreach(_.cancel(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,11 @@ class KyuubiSyncThriftClient private (
/**
* Lock every rpc call to send them sequentially
*/
private def withLockAcquired[T](block: => T): T = {
try {
lock.lock()
if (!protocol.getTransport.isOpen) {
throw KyuubiSQLException.connectionDoesNotExist()
}
block
} finally lock.unlock()
private def withLockAcquired[T](block: => T): T = Utils.withLockRequired(lock) {
if (!protocol.getTransport.isOpen) {
throw KyuubiSQLException.connectionDoesNotExist()
}
block
}

private def withLockAcquiredAsyncRequest[T](block: => T): T = withLockAcquired {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class BatchJobSubmission(
override def getOperationLog: Option[OperationLog] = Option(_operationLog)

// we can not set to other state if it is canceled
private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
private def setStateIfNotCanceled(newState: OperationState): Unit = withLockRequired {
if (state != CANCELED) {
setState(newState)
applicationId(_applicationInfo).foreach { appId =>
Expand Down Expand Up @@ -318,7 +318,7 @@ class BatchJobSubmission(
}
}

override def close(): Unit = state.synchronized {
override def close(): Unit = withLockRequired {
if (!isClosedOrCanceled) {
try {
getOperationLog.foreach(_.close())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi

protected def onError(action: String = "operating"): PartialFunction[Throwable, Unit] = {
case e: Throwable =>
state.synchronized {
withLockRequired {
if (isTerminalState(state)) {
warn(s"Ignore exception in terminal state with $statementId", e)
} else {
Expand Down Expand Up @@ -101,14 +101,14 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
}

override protected def afterRun(): Unit = {
state.synchronized {
withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
}
}

override def cancel(): Unit = state.synchronized {
override def cancel(): Unit = withLockRequired {
if (!isClosedOrCanceled) {
setState(OperationState.CANCELED)
MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))
Expand All @@ -123,7 +123,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
}
}

override def close(): Unit = state.synchronized {
override def close(): Unit = withLockRequired {
if (!isClosedOrCanceled) {
setState(OperationState.CLOSED)
MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))
Expand Down

0 comments on commit ccacb33

Please sign in to comment.