Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implement Kyuubi Chat Engine #4544

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions build/dist
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ mkdir -p "$DISTDIR/externals/engines/spark"
mkdir -p "$DISTDIR/externals/engines/trino"
mkdir -p "$DISTDIR/externals/engines/hive"
mkdir -p "$DISTDIR/externals/engines/jdbc"
mkdir -p "$DISTDIR/externals/engines/chat"
echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE"
echo "Java $JAVA_VERSION" >> "$DISTDIR/RELEASE"
echo "Scala $SCALA_VERSION" >> "$DISTDIR/RELEASE"
Expand Down Expand Up @@ -313,6 +314,18 @@ for jar in $(ls "$DISTDIR/jars/"); do
fi
done

# Copy chat engines
cp "$KYUUBI_HOME/externals/kyuubi-chat-engine/target/kyuubi-chat-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/chat/"
cp -r "$KYUUBI_HOME"/externals/kyuubi-chat-engine/target/scala-$SCALA_VERSION/jars/*.jar "$DISTDIR/externals/engines/chat/"

# Share the jars w/ server to reduce binary size
# shellcheck disable=SC2045
for jar in $(ls "$DISTDIR/jars/"); do
if [[ -f "$DISTDIR/externals/engines/chat/$jar" ]]; then
(cd $DISTDIR/externals/engines/chat; ln -snf "../../../jars/$jar" "$DISTDIR/externals/engines/chat/$jar")
fi
done

# Copy kyuubi tools
if [[ -f "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner_${SCALA_VERSION}-${VERSION}.jar" ]]; then
mkdir -p "$DISTDIR/tools/spark-block-cleaner/kubernetes"
Expand Down
113 changes: 59 additions & 54 deletions docs/deployment/settings.md

Large diffs are not rendered by default.

89 changes: 89 additions & 0 deletions externals/kyuubi-chat-engine/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-parent</artifactId>
<version>1.8.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>kyuubi-chat-engine_2.12</artifactId>
<packaging>jar</packaging>
<name>Kyuubi Project Engine Chat</name>
<url>https://kyuubi.apache.org/</url>

<dependencies>
<!-- kyuubi dependency -->
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>${hive.jdbc.artifact}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>prepare-test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
</plugins>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.chat

import org.apache.kyuubi.engine.chat.session.ChatSessionManager
import org.apache.kyuubi.service.AbstractBackendService
import org.apache.kyuubi.session.SessionManager

class ChatBackendService
extends AbstractBackendService("ChatBackendService") {

override val sessionManager: SessionManager = new ChatSessionManager()

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.chat

import ChatEngine.currentEngine

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, JDBC_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister

class ChatEngine extends Serverable("ChatEngine") {

override val backendService = new ChatBackendService()
override val frontendServices = Seq(new ChatTBinaryFrontendService(this))

override def start(): Unit = {
super.start()
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker(() => {
currentEngine.foreach(_.stop())
})
}

override protected def stopServer(): Unit = {}
}

object ChatEngine extends Logging {

val kyuubiConf: KyuubiConf = KyuubiConf()

var currentEngine: Option[ChatEngine] = None

def startEngine(): Unit = {
currentEngine = Some(new ChatEngine())
currentEngine.foreach { engine =>
engine.initialize(kyuubiConf)
engine.start()
addShutdownHook(
() => {
engine.stop()
},
JDBC_ENGINE_SHUTDOWN_PRIORITY + 1)
}
}

def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)

try {
Utils.fromCommandLineArgs(args, kyuubiConf)
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString)

startEngine()
} catch {
case t: Throwable if currentEngine.isDefined =>
currentEngine.foreach { engine =>
engine.stop()
}
error("Failed to create Chat Engine", t)
throw t
case t: Throwable =>
error("Failed to create Chat Engine.", t)
throw t
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.chat

import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}

class ChatTBinaryFrontendService(override val serverable: Serverable)
extends TBinaryFrontendService("ChatTBinaryFrontend") {

/**
* An optional `ServiceDiscovery` for [[FrontendService]] to expose itself
*/
override lazy val discoveryService: Option[Service] =
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
Some(new EngineServiceDiscovery(this))
} else {
None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.chat.operation

import org.apache.hive.service.rpc.thrift._

import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.chat.schema.{RowSet, SchemaHelper}
import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState}
import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
import org.apache.kyuubi.session.Session

abstract class ChatOperation(session: Session) extends AbstractOperation(session) {

protected var iter: FetchIterator[Array[String]] = _

protected lazy val conf: KyuubiConf = session.sessionManager.getConf

override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
order match {
case FETCH_NEXT =>
iter.fetchNext()
case FETCH_PRIOR =>
iter.fetchPrior(rowSetSize)
case FETCH_FIRST =>
iter.fetchAbsolute(0)
}

val taken = iter.take(rowSetSize)
val resultRowSet = RowSet.toTRowSet(taken.toSeq, 1, getProtocolVersion)
resultRowSet.setStartRowOffset(iter.getPosition)
resultRowSet
}

override def cancel(): Unit = {
cleanup(OperationState.CANCELED)
}

override def close(): Unit = {
cleanup(OperationState.CLOSED)
}

protected def onError(cancel: Boolean = false): PartialFunction[Throwable, Unit] = {
// We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
// could be thrown.
case e: Throwable =>
state.synchronized {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
setOperationException(ke)
throw ke
} else if (isTerminalState(state)) {
setOperationException(KyuubiSQLException(errMsg))
warn(s"Ignore exception in terminal state with $statementId: $errMsg")
} else {
error(s"Error operating $opType: $errMsg", e)
val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
setOperationException(ke)
setState(OperationState.ERROR)
throw ke
}
}
}

override protected def beforeRun(): Unit = {
setState(OperationState.PENDING)
setHasResultSet(true)
}

override protected def afterRun(): Unit = {}

override def getResultSetMetadata: TGetResultSetMetadataResp = {
val tTableSchema = SchemaHelper.stringTTableSchema("reply")
val resp = new TGetResultSetMetadataResp
resp.setSchema(tTableSchema)
resp.setStatus(OK_STATUS)
resp
}

override def shouldRunAsync: Boolean = false
}
Loading