Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ KYUUBIADMIN: 'KYUUBIADMIN';

SESSION: 'SESSION';

ENGINE: 'ENGINE';

RESTART: 'RESTART';

BACKQUOTED_IDENTIFIER
: '`' ( ~'`' | '``' )* '`'
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ statement

runnableCommand
: (DESC | DESCRIBE) SESSION #describeSession
| (DESC | DESCRIBE) ENGINE #describeEngine
| RESTART ENGINE #restartEngine
;
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import org.apache.kyuubi.util.{ThreadUtils, ThriftUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

class KyuubiSyncThriftClient private (
host: String,
port: Int,
protocol: TProtocol,
engineAliveProbeProtocol: Option[TProtocol],
engineAliveProbeInterval: Long,
Expand Down Expand Up @@ -77,6 +79,9 @@ class KyuubiSyncThriftClient private (
@VisibleForTesting
private[kyuubi] def getEngineAliveProbeProtocol: Option[TProtocol] = engineAliveProbeProtocol

private[kyuubi] def getHost: String = this.host
private[kyuubi] def getPort: Int = this.port

private def shutdownAsyncRequestExecutor(): Unit = {
Option(asyncRequestExecutor).filterNot(_.isShutdown).foreach(ThreadUtils.shutdown(_))
asyncRequestInterrupted = true
Expand Down Expand Up @@ -483,6 +488,8 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging {
None
}
new KyuubiSyncThriftClient(
host,
port,
tProtocol,
aliveProbeProtocol,
aliveProbeInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[kyuubi] class EngineRef(
// a small amount of time for timeout
private val LOCK_TIMEOUT_SPAN_FACTOR = if (Utils.isTesting) 0.5 else 0.1

private var builder: ProcBuilder = _
@volatile private var builder: ProcBuilder = _

private[kyuubi] def getEngineRefId(): String = engineRefId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.kyuubi.session

import java.util.Base64
import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.collection.JavaConverters._

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
Expand Down Expand Up @@ -80,7 +81,7 @@ class KyuubiSessionImpl(
handle.identifier.toString,
sessionManager.applicationManager,
sessionManager.engineStartupProcessSemaphore)
private[kyuubi] val launchEngineOp = sessionManager.operationManager
@volatile private[kyuubi] var launchEngineOp = sessionManager.operationManager
.newLaunchEngineOperation(this, sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))

private lazy val sessionUserSignBase64: String =
Expand All @@ -101,7 +102,18 @@ class KyuubiSessionImpl(
}

@volatile private var _client: KyuubiSyncThriftClient = _
def client: KyuubiSyncThriftClient = _client

private val lock = new ReentrantReadWriteLock()

private def withReadLockAcquired[T](block: => T): T = Utils.withLockRequired(lock.readLock()) {
block
}

private def withWriteLockAcquired[T](block: => T): T = Utils.withLockRequired(lock.writeLock()) {
block
}

def client: KyuubiSyncThriftClient = withReadLockAcquired { _client }

@volatile private var _engineSessionHandle: SessionHandle = _

Expand All @@ -119,6 +131,17 @@ class KyuubiSessionImpl(
engineLastAlive = System.currentTimeMillis()
}

def reLaunchEngine(): Unit = handleSessionException {
engineLaunched = false
withDiscoveryClient(sessionConf) { discoveryClient =>
engine.deregister(discoveryClient, (_client.getHost, _client.getPort))
}
launchEngineOp = sessionManager.operationManager
.newLaunchEngineOperation(this, sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
runOperation(launchEngineOp)
Copy link
Member

@wForget wForget Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will re-execute openEngineSession and we need to consider whether some states are as expected.

  • openedTime will be reset: sessionEvent.openedTime = System.currentTimeMillis()
  • do we need to update engineLastAlive?
  • EngineRef#builder is changed, volatile required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed engineLastAlive , EngineRef#builder.
I think openedTime means engine openedTime, so we don't need to update openedTime.And i changed the org.apache.kyuubi.events.KyuubiSessionEvent#openedTime comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shall need to wrap this method with _client.withLockAcquired.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case that other operations access the _client during restarting engine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not be able to use the lock of _client because _client will be assigned a new object after this method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case that other operations access the _client during restarting engine.

This seems difficult to avoid

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we need to add ReadWriteLock for the client method of KyuubiSessionImpl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we need to add ReadWriteLock for the client method of KyuubiSessionImpl.

submit Operation from multiple threads will meet problem , I will fix it.

engineLastAlive = System.currentTimeMillis()
}

private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit =
handleSessionException {
withDiscoveryClient(sessionConf) { discoveryClient =>
Expand Down Expand Up @@ -158,17 +181,22 @@ class KyuubiSessionImpl(
}

try {
val passwd =
val passwd = {
if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
InternalSecurityAccessor.get().issueToken()
} else {
Option(password).filter(_.nonEmpty).getOrElse("anonymous")
}
_client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
_engineSessionHandle =
_client.openSession(protocol, user, passwd, openEngineSessionConf)
logSessionInfo(s"Connected to engine [$host:$port]/[${client.engineId.getOrElse("")}]" +
s" with ${_engineSessionHandle}]")
}
withWriteLockAcquired{
if (_client != null) _client.closeSession()
_client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
_engineSessionHandle =
_client.openSession(protocol, user, passwd, openEngineSessionConf)
logSessionInfo(
s"Connected to engine [$host:$port]/[${client.engineId.getOrElse("")}]" +
s" with ${_engineSessionHandle}]")
}
shouldRetry = false
} catch {
case e: TTransportException
Expand Down Expand Up @@ -199,7 +227,7 @@ class KyuubiSessionImpl(
attempt += 1
if (shouldRetry && _client != null) {
try {
_client.closeSession()
withWriteLockAcquired{ _client.closeSession() }
} catch {
case e: Throwable =>
warn(
Expand Down Expand Up @@ -269,7 +297,9 @@ class KyuubiSessionImpl(
super.close()
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
try {
if (_client != null) _client.closeSession()
if (_client != null) {
withWriteLockAcquired { _client.closeSession() }
}
} finally {
openSessionError.foreach { _ => if (engine != null) engine.close() }
sessionEvent.endTime = System.currentTimeMillis()
Expand Down Expand Up @@ -314,6 +344,7 @@ class KyuubiSessionImpl(

def checkEngineConnectionAlive(): Boolean = {
try {
if (!engineLaunched) return true
if (Option(client).exists(_.engineConnectionClosed)) return false
if (!aliveProbeEnabled) return true
getInfo(TGetInfoType.CLI_DBMS_VER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.kyuubi.sql.parser.server
import org.apache.kyuubi.sql.{KyuubiSqlBaseParser, KyuubiSqlBaseParserBaseVisitor}
import org.apache.kyuubi.sql.KyuubiSqlBaseParser.SingleStatementContext
import org.apache.kyuubi.sql.plan.{KyuubiTreeNode, PassThroughNode}
import org.apache.kyuubi.sql.plan.command.{DescribeSession, RunnableCommand}
import org.apache.kyuubi.sql.plan.command.{DescribeEngine, DescribeSession, RestartEngine, RunnableCommand}

class KyuubiAstBuilder extends KyuubiSqlBaseParserBaseVisitor[AnyRef] {

Expand All @@ -44,4 +44,14 @@ class KyuubiAstBuilder extends KyuubiSqlBaseParserBaseVisitor[AnyRef] {
: RunnableCommand = {
DescribeSession()
}

override def visitDescribeEngine(ctx: KyuubiSqlBaseParser.DescribeEngineContext)
: RunnableCommand = {
DescribeEngine()
}

override def visitRestartEngine(ctx: KyuubiSqlBaseParser.RestartEngineContext)
: RunnableCommand = {
RestartEngine()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.sql.plan.command

import scala.collection.mutable.ListBuffer

import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionImpl}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId
import org.apache.kyuubi.sql.schema.{Column, Row, Schema}

/**
* A runnable node for description the current session.
*
* The syntax of using this command in SQL is:
* {{{
* [DESC|DESCRIBE] ENGINE;
* }}}
*/
case class DescribeEngine() extends RunnableCommand {

override def run(kyuubiSession: KyuubiSession): Unit = {
val rows = Seq(kyuubiSession).map { session =>
val client = session.asInstanceOf[KyuubiSessionImpl].client
val values = new ListBuffer[String]()
values += client.engineId.getOrElse("")
values += client.engineName.getOrElse("")
values += client.engineUrl.getOrElse("")
Row(values.toList)
}
iter = new IterableFetchIterator(rows)
}

override def resultSchema: Schema = {
Schema(DescribeEngine.outputCols().toList)
}

override def name(): String = "Describe Engine Node"
}

object DescribeEngine {

def outputCols(): Seq[Column] = {
Seq(
Column("ENGINE_ID", TTypeId.STRING_TYPE, Some("Kyuubi engine identify")),
Column("ENGINE_NAME", TTypeId.STRING_TYPE, Some("Kyuubi engine name")),
Column("ENGINE_URL", TTypeId.STRING_TYPE, Some("Kyuubi engine url")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ object DescribeSession {

def outputCols(): Seq[Column] = {
Seq(
Column("id", TTypeId.STRING_TYPE, Some("Kyuubi session identify")),
Column("user", TTypeId.STRING_TYPE, Some("Kyuubi session user")),
Column("type", TTypeId.STRING_TYPE, Some("Kyuubi session type")))
Column("SESSION_ID", TTypeId.STRING_TYPE, Some("Kyuubi session identify")),
Column("SESSION_USER", TTypeId.STRING_TYPE, Some("Kyuubi session user")),
Column("SESSION_TYPE", TTypeId.STRING_TYPE, Some("Kyuubi session type")))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.sql.plan.command

import scala.collection.mutable.ListBuffer

import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionImpl}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId
import org.apache.kyuubi.sql.schema.{Column, Row, Schema}

/**
* A runnable node for description the current session.
*
* The syntax of using this command in SQL is:
* {{{
* RESTART ENGINE;
* }}}
*/
case class RestartEngine() extends RunnableCommand {

override def run(kyuubiSession: KyuubiSession): Unit = {
val rows = Seq(kyuubiSession.asInstanceOf[KyuubiSessionImpl]).map { session =>
session.reLaunchEngine()
val client = session.client
val values = new ListBuffer[String]()
values += client.engineId.getOrElse("")
values += client.engineName.getOrElse("")
values += client.engineUrl.getOrElse("")
Row(values.toList)
}
iter = new IterableFetchIterator(rows)
}

override def resultSchema: Schema = {
Schema(RestartEngine.outputCols().toList)
}

override def name(): String = "Restart Session Engine Node"
}

object RestartEngine {
def outputCols(): Seq[Column] = {
Seq(
Column("ENGINE_ID", TTypeId.STRING_TYPE, Some("Kyuubi engine identify")),
Column("ENGINE_NAME", TTypeId.STRING_TYPE, Some("Kyuubi engine name")),
Column("ENGINE_URL", TTypeId.STRING_TYPE, Some("Kyuubi engine url")))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.operation.parser

class DescribeEngineSuite extends ExecutedCommandExecSuite {

test("desc/describe kyuubi engine") {
Seq("DESC", "DESCRIBE").foreach { desc =>
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(s"KYUUBI $desc ENGINE")
assert(resultSet.next())

assert(resultSet.getMetaData.getColumnCount == 3)
assert(resultSet.getMetaData.getColumnName(1) == "ENGINE_ID")
assert(resultSet.getMetaData.getColumnName(2) == "ENGINE_NAME")
assert(resultSet.getMetaData.getColumnName(3) == "ENGINE_URL")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ package org.apache.kyuubi.operation.parser

class DescribeSessionSuite extends ExecutedCommandExecSuite {

test("desc kyuubi session") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("KYUUBI DESC SESSION")
assert(resultSet.next())
test("desc/describe kyuubi session") {
Seq("DESC", "DESCRIBE").foreach { desc =>
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(s"KYUUBI $desc SESSION")
assert(resultSet.next())

assert(resultSet.getMetaData.getColumnCount == 3)
assert(resultSet.getMetaData.getColumnName(1) == "id")
assert(resultSet.getMetaData.getColumnName(2) == "user")
assert(resultSet.getMetaData.getColumnName(3) == "type")
assert(resultSet.getMetaData.getColumnCount == 3)
assert(resultSet.getMetaData.getColumnName(1) == "SESSION_ID")
assert(resultSet.getMetaData.getColumnName(2) == "SESSION_USER")
assert(resultSet.getMetaData.getColumnName(3) == "SESSION_TYPE")
}
}
}
}
Loading