diff --git a/dev/dependencyList b/dev/dependencyList
index 2035b95dea8..9813bb56288 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -69,7 +69,7 @@ jackson-annotations/2.14.2//jackson-annotations-2.14.2.jar
jackson-core/2.14.2//jackson-core-2.14.2.jar
jackson-databind/2.14.2//jackson-databind-2.14.2.jar
jackson-dataformat-yaml/2.14.2//jackson-dataformat-yaml-2.14.2.jar
-jackson-datatype-jdk8/2.12.3//jackson-datatype-jdk8-2.12.3.jar
+jackson-datatype-jdk8/2.14.2//jackson-datatype-jdk8-2.14.2.jar
jackson-datatype-jsr310/2.14.2//jackson-datatype-jsr310-2.14.2.jar
jackson-jaxrs-base/2.14.2//jackson-jaxrs-base-2.14.2.jar
jackson-jaxrs-json-provider/2.14.2//jackson-jaxrs-json-provider-2.14.2.jar
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
index e19858e86e5..b9e254508dd 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
@@ -158,11 +158,11 @@ abstract class AbstractBackendService(name: String)
override def getOperationStatus(
operationHandle: OperationHandle,
- maxWait: Long = timeout): OperationStatus = {
+ maxWait: Option[Long]): OperationStatus = {
val operation = sessionManager.operationManager.getOperation(operationHandle)
if (operation.shouldRunAsync) {
try {
- val waitTime = maxWait
+ val waitTime = maxWait.getOrElse(timeout)
operation.getBackgroundHandle.get(waitTime, TimeUnit.MILLISECONDS)
} catch {
case e: TimeoutException =>
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
index 4119a0f6c36..968a94197d2 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
@@ -91,7 +91,9 @@ trait BackendService {
foreignTable: String): OperationHandle
def getQueryId(operationHandle: OperationHandle): String
- def getOperationStatus(operationHandle: OperationHandle, maxWait: Long = 0): OperationStatus
+ def getOperationStatus(
+ operationHandle: OperationHandle,
+ maxWait: Option[Long] = None): OperationStatus
def cancelOperation(operationHandle: OperationHandle): Unit
def closeOperation(operationHandle: OperationHandle): Unit
def getResultSetMetadata(operationHandle: OperationHandle): TGetResultSetMetadataResp
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
index 4c4622805b6..68bf11d7f99 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
@@ -154,9 +154,9 @@ trait BackendServiceMetric extends BackendService {
abstract override def getOperationStatus(
operationHandle: OperationHandle,
- maxWait: Long): OperationStatus = {
+ maxWait: Option[Long] = None): OperationStatus = {
MetricsSystem.timerTracing(MetricsConstants.BS_GET_OPERATION_STATUS) {
- super.getOperationStatus(operationHandle)
+ super.getOperationStatus(operationHandle, maxWait)
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
index 94d4b98d704..c8c9e2fd6c1 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
@@ -51,7 +51,7 @@ case class Query(
def getQueryResults(token: Long, uriInfo: UriInfo, maxWait: Long = 0): QueryResults = {
val status =
- be.getOperationStatus(queryId.operationHandle, maxWait)
+ be.getOperationStatus(queryId.operationHandle, Some(maxWait))
val nextUri = if (status.exception.isEmpty) {
getNextUri(token + 1, uriInfo, toSlugContext(status.state))
} else null
@@ -155,14 +155,12 @@ object Query {
private def createSession(
context: TrinoContext,
backendService: BackendService): SessionHandle = {
- context.session
- .get("sessionId")
- .fold(backendService.openSession(
- TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
- context.user,
- "",
- context.remoteUserAddress.getOrElse(""),
- context.session))(SessionHandle.fromUUID)
+ backendService.openSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
+ context.user,
+ "",
+ context.remoteUserAddress.getOrElse(""),
+ context.session)
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/QueryManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/QueryManager.scala
deleted file mode 100644
index 1d0e0eb4391..00000000000
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/QueryManager.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.server.trino.api
-
-import java.util.concurrent.ConcurrentHashMap
-
-class QueryManager() {
- private val queries = new ConcurrentHashMap[QueryId, Query]()
-
- def removeQuery(queryId: QueryId): Unit = {}
-
- def getQuery(queryId: QueryId): Option[Query] = None
-
- def addQuery(query: Query): Unit = {}
-
-}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala
index dd7797d59d2..9e77139040a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala
@@ -26,10 +26,9 @@ import scala.collection.JavaConverters._
import io.trino.client.{ClientStandardTypes, ClientTypeSignature, Column, QueryError, QueryResults, StatementStats, Warning}
import io.trino.client.ProtocolHeaders.TRINO_HEADERS
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
-
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet, TTypeId}
+import org.apache.kyuubi.operation.OperationState.FINISHED
import org.apache.kyuubi.operation.OperationStatus
/**
@@ -193,14 +192,16 @@ object TrinoContext {
case None => null
}
- val updatedNextUri =
- if (rowList == null || rowList.isEmpty || rowList.get(0).isEmpty) null else nextUri
+ val updatedNextUri = queryStatus.state match {
+ case FINISHED if rowList == null || rowList.isEmpty || rowList.get(0).isEmpty => null
+ case _ => nextUri
+ }
new QueryResults(
queryId,
queryHtmlUri,
nextUri,
- nextUri,
+ updatedNextUri,
columnList,
rowList,
StatementStats.builder.setState(queryStatus.state.name()).setQueued(false)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoScalaObjectMapper.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoScalaObjectMapper.scala
index 2ff8406f621..f6055927ac2 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoScalaObjectMapper.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoScalaObjectMapper.scala
@@ -26,15 +26,6 @@ class TrinoScalaObjectMapper extends ContextResolver[ObjectMapper] {
private lazy val mapper = new ObjectMapper()
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
-// .disable(MapperFeature.AUTO_DETECT_CREATORS)
-// .disable(MapperFeature.AUTO_DETECT_FIELDS)
-// .disable(MapperFeature.AUTO_DETECT_SETTERS)
-// .disable(MapperFeature.AUTO_DETECT_GETTERS)
-// .disable(MapperFeature.AUTO_DETECT_IS_GETTERS)
-// .disable(MapperFeature.USE_GETTERS_AS_SETTERS)
-// .disable(MapperFeature.CAN_OVERRIDE_ACCESS_MODIFIERS)
-// .disable(MapperFeature.INFER_PROPERTY_MUTATORS)
-// .disable(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS)
.registerModule(new Jdk8Module)
override def getContext(aClass: Class[_]): ObjectMapper = mapper
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala
index d8cdf8dfdbb..ab783f8acce 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala
@@ -32,7 +32,7 @@ import io.swagger.v3.oas.annotations.tags.Tag
import io.trino.client.QueryResults
import org.apache.kyuubi.Logging
-import org.apache.kyuubi.server.trino.api.{ApiRequestContext, KyuubiTrinoOperationTranslator, Query, QueryId, QueryManager, Slug, TrinoContext}
+import org.apache.kyuubi.server.trino.api.{ApiRequestContext, KyuubiTrinoOperationTranslator, Query, QueryId, Slug, TrinoContext}
import org.apache.kyuubi.server.trino.api.Slug.Context.{EXECUTING_QUERY, QUEUED_QUERY}
import org.apache.kyuubi.server.trino.api.v1.dto.Ok
import org.apache.kyuubi.service.BackendService
@@ -42,7 +42,6 @@ import org.apache.kyuubi.service.BackendService
private[v1] class StatementResource extends ApiRequestContext with Logging {
lazy val translator = new KyuubiTrinoOperationTranslator(fe.be)
- lazy val queryManager = new QueryManager()
@ApiResponse(
responseCode = "200",
@@ -117,7 +116,6 @@ private[v1] class StatementResource extends ApiRequestContext with Logging {
case NonFatal(e) =>
val errorMsg =
s"Error executing for query id $queryId"
- e.printStackTrace()
error(errorMsg, e)
throw badRequest(NOT_FOUND, "Query not found")
}.get
@@ -150,7 +148,6 @@ private[v1] class StatementResource extends ApiRequestContext with Logging {
case NonFatal(e) =>
val errorMsg =
s"Error executing for query id $queryId"
- e.printStackTrace()
error(errorMsg, e)
throw badRequest(NOT_FOUND, "Query not found")
}.get
@@ -225,7 +222,7 @@ private[v1] class StatementResource extends ApiRequestContext with Logging {
}
private def badRequest(status: Response.Status, message: String) =
- throw new WebApplicationException(
+ new WebApplicationException(
Response.status(status)
.`type`(TEXT_PLAIN_TYPE)
.entity(message)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala
index 50fa7d9ef47..c88b5c9409f 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala
@@ -54,7 +54,7 @@ class TrinoClientApiSuite extends KyuubiFunSuite with TrinoRestFrontendTestHelpe
val result1 = execute(trino1)
val sessionId1 = trino1.getSetSessionProperties.asScala.get("sessionId")
assert(result1 == List(List(2)))
- assert(sessionId == sessionId1)
+ assert(sessionId != sessionId1)
trino.close()
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
index 8d7b2bf2ccf..87c8eda968a 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
@@ -85,7 +85,7 @@ class TrinoContextSuite extends KyuubiFunSuite with RestFrontendTestHelper {
val metadataResp = fe.be.getResultSetMetadata(opHandle)
val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false)
- val status = fe.be.getOperationStatus(opHandle)
+ val status = fe.be.getOperationStatus(opHandle, Some(0))
val uri = new URI("sfdsfsdfdsf")
val results = TrinoContext
@@ -112,7 +112,7 @@ class TrinoContextSuite extends KyuubiFunSuite with RestFrontendTestHelper {
val metadataResp = fe.be.getResultSetMetadata(opHandle)
val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false)
- val status = fe.be.getOperationStatus(opHandle)
+ val status = fe.be.getOperationStatus(opHandle, Some(0))
val uri = new URI("sfdsfsdfdsf")
val results = TrinoContext
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/v1/StatementResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/v1/StatementResourceSuite.scala
index 207f9e3b0b2..adbf389c931 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/v1/StatementResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/v1/StatementResourceSuite.scala
@@ -22,16 +22,22 @@ import javax.ws.rs.core.{MediaType, Response}
import scala.collection.JavaConverters._
+import io.trino.client.{QueryError, QueryResults}
import io.trino.client.ProtocolHeaders.TRINO_HEADERS
-import io.trino.client.QueryResults
-import org.apache.kyuubi.{KyuubiSQLException, TrinoRestFrontendTestHelper}
+import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, TrinoRestFrontendTestHelper}
import org.apache.kyuubi.operation.{OperationHandle, OperationState}
import org.apache.kyuubi.server.trino.api.TrinoContext
import org.apache.kyuubi.server.trino.api.v1.dto.Ok
import org.apache.kyuubi.session.SessionHandle
-class StatementResourceSuite extends TrinoRestFrontendTestHelper {
+class StatementResourceSuite extends KyuubiFunSuite with TrinoRestFrontendTestHelper {
+
+ case class TrinoResponse(
+ response: Option[Response] = None,
+ queryError: Option[QueryError] = None,
+ data: List[List[Any]] = List[List[Any]](),
+ isEnd: Boolean = false)
test("statement test") {
val response = webTarget.path("v1/statement/test").request().get()
@@ -39,10 +45,28 @@ class StatementResourceSuite extends TrinoRestFrontendTestHelper {
assert(result == new Ok("trino server is running"))
}
+ test("statement submit for query error") {
+
+ val response = webTarget.path("v1/statement")
+ .request().post(Entity.entity("select a", MediaType.TEXT_PLAIN_TYPE))
+
+ val trinoResponseIter = Iterator.iterate(TrinoResponse(response = Option(response)))(getData)
+ val isErr = trinoResponseIter.takeWhile(_.isEnd == false).exists { t =>
+ t.queryError != None && t.response == None
+ }
+ assert(isErr == true)
+ }
+
test("statement submit and get result") {
val response = webTarget.path("v1/statement")
.request().post(Entity.entity("select 1", MediaType.TEXT_PLAIN_TYPE))
- checkResult(response)
+
+ val trinoResponseIter = Iterator.iterate(TrinoResponse(response = Option(response)))(getData)
+ val dataSet = trinoResponseIter
+ .takeWhile(_.isEnd == false)
+ .map(_.data)
+ .flatten.toList
+ assert(dataSet == List(List(1)))
}
test("query cancel") {
@@ -74,20 +98,21 @@ class StatementResourceSuite extends TrinoRestFrontendTestHelper {
}
- private def checkResult(response: Response): Unit = {
- assert(response.getStatus == 200)
- val qr = response.readEntity(classOf[QueryResults])
- if (qr.getData.iterator().hasNext) {
- val resultSet = qr.getData.iterator()
- assert(resultSet.next.asScala == List(1))
- }
- if (qr.getNextUri != null) {
- val path = qr.getNextUri.getPath
- val headers = response.getHeaders
- val nextResponse = webTarget.path(path).request().headers(headers).get()
- checkResult(nextResponse)
- }
-
+ private def getData(current: TrinoResponse): TrinoResponse = {
+ current.response.map { response =>
+ assert(response.getStatus == 200)
+ val qr = response.readEntity(classOf[QueryResults])
+ val nextData = Option(qr.getData)
+ .map(_.asScala.toList.map(_.asScala.toList))
+ .getOrElse(List[List[Any]]())
+ val nextResponse = Option(qr.getNextUri).map {
+ uri =>
+ val path = uri.getPath
+ val headers = response.getHeaders
+ webTarget.path(path).request().headers(headers).get()
+ }
+ TrinoResponse(nextResponse, Option(qr.getError), nextData)
+ }.getOrElse(TrinoResponse(isEnd = true))
}
}
diff --git a/pom.xml b/pom.xml
index 63af01f2a9f..06fba720d3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,7 +184,6 @@
4.1.0
1.7.36
1.33
- 2.12.3