From 936ea1f8cb47263f7735557be7cf795d4c1d2105 Mon Sep 17 00:00:00 2001 From: odone Date: Fri, 10 Feb 2023 15:30:03 +0800 Subject: [PATCH] address --- dev/dependencyList | 2 +- .../service/AbstractBackendService.scala | 4 +- .../kyuubi/service/BackendService.scala | 4 +- .../kyuubi/server/BackendServiceMetric.scala | 4 +- .../kyuubi/server/trino/api/Query.scala | 16 +++-- .../server/trino/api/QueryManager.scala | 31 ---------- .../server/trino/api/TrinoContext.scala | 11 ++-- .../trino/api/TrinoScalaObjectMapper.scala | 9 --- .../trino/api/v1/StatementResource.scala | 7 +-- .../trino/api/TrinoClientApiSuite.scala | 2 +- .../server/trino/api/TrinoContextSuite.scala | 4 +- .../trino/api/v1/StatementResourceSuite.scala | 61 +++++++++++++------ pom.xml | 13 ++-- 13 files changed, 75 insertions(+), 93 deletions(-) delete mode 100644 kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/QueryManager.scala diff --git a/dev/dependencyList b/dev/dependencyList index 2035b95dea8..9813bb56288 100644 --- a/dev/dependencyList +++ b/dev/dependencyList @@ -69,7 +69,7 @@ jackson-annotations/2.14.2//jackson-annotations-2.14.2.jar jackson-core/2.14.2//jackson-core-2.14.2.jar jackson-databind/2.14.2//jackson-databind-2.14.2.jar jackson-dataformat-yaml/2.14.2//jackson-dataformat-yaml-2.14.2.jar -jackson-datatype-jdk8/2.12.3//jackson-datatype-jdk8-2.12.3.jar +jackson-datatype-jdk8/2.14.2//jackson-datatype-jdk8-2.14.2.jar jackson-datatype-jsr310/2.14.2//jackson-datatype-jsr310-2.14.2.jar jackson-jaxrs-base/2.14.2//jackson-jaxrs-base-2.14.2.jar jackson-jaxrs-json-provider/2.14.2//jackson-jaxrs-json-provider-2.14.2.jar diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala index e19858e86e5..b9e254508dd 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala @@ -158,11 +158,11 @@ abstract class AbstractBackendService(name: String) override def getOperationStatus( operationHandle: OperationHandle, - maxWait: Long = timeout): OperationStatus = { + maxWait: Option[Long]): OperationStatus = { val operation = sessionManager.operationManager.getOperation(operationHandle) if (operation.shouldRunAsync) { try { - val waitTime = maxWait + val waitTime = maxWait.getOrElse(timeout) operation.getBackgroundHandle.get(waitTime, TimeUnit.MILLISECONDS) } catch { case e: TimeoutException => diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala index 4119a0f6c36..968a94197d2 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala @@ -91,7 +91,9 @@ trait BackendService { foreignTable: String): OperationHandle def getQueryId(operationHandle: OperationHandle): String - def getOperationStatus(operationHandle: OperationHandle, maxWait: Long = 0): OperationStatus + def getOperationStatus( + operationHandle: OperationHandle, + maxWait: Option[Long] = None): OperationStatus def cancelOperation(operationHandle: OperationHandle): Unit def closeOperation(operationHandle: OperationHandle): Unit def getResultSetMetadata(operationHandle: OperationHandle): TGetResultSetMetadataResp diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala index 4c4622805b6..68bf11d7f99 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala @@ -154,9 +154,9 @@ trait BackendServiceMetric extends BackendService { abstract override def getOperationStatus( operationHandle: OperationHandle, - maxWait: Long): OperationStatus = { + maxWait: Option[Long] = None): OperationStatus = { MetricsSystem.timerTracing(MetricsConstants.BS_GET_OPERATION_STATUS) { - super.getOperationStatus(operationHandle) + super.getOperationStatus(operationHandle, maxWait) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala index 94d4b98d704..c8c9e2fd6c1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala @@ -51,7 +51,7 @@ case class Query( def getQueryResults(token: Long, uriInfo: UriInfo, maxWait: Long = 0): QueryResults = { val status = - be.getOperationStatus(queryId.operationHandle, maxWait) + be.getOperationStatus(queryId.operationHandle, Some(maxWait)) val nextUri = if (status.exception.isEmpty) { getNextUri(token + 1, uriInfo, toSlugContext(status.state)) } else null @@ -155,14 +155,12 @@ object Query { private def createSession( context: TrinoContext, backendService: BackendService): SessionHandle = { - context.session - .get("sessionId") - .fold(backendService.openSession( - TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11, - context.user, - "", - context.remoteUserAddress.getOrElse(""), - context.session))(SessionHandle.fromUUID) + backendService.openSession( + TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11, + context.user, + "", + context.remoteUserAddress.getOrElse(""), + context.session) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/QueryManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/QueryManager.scala deleted file mode 100644 index 1d0e0eb4391..00000000000 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/QueryManager.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.server.trino.api - -import java.util.concurrent.ConcurrentHashMap - -class QueryManager() { - private val queries = new ConcurrentHashMap[QueryId, Query]() - - def removeQuery(queryId: QueryId): Unit = {} - - def getQuery(queryId: QueryId): Option[Query] = None - - def addQuery(query: Query): Unit = {} - -} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala index dd7797d59d2..9e77139040a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala @@ -26,10 +26,9 @@ import scala.collection.JavaConverters._ import io.trino.client.{ClientStandardTypes, ClientTypeSignature, Column, QueryError, QueryResults, StatementStats, Warning} import io.trino.client.ProtocolHeaders.TRINO_HEADERS -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet} - import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet, TTypeId} +import org.apache.kyuubi.operation.OperationState.FINISHED import org.apache.kyuubi.operation.OperationStatus /** @@ -193,14 +192,16 @@ object TrinoContext { case None => null } - val updatedNextUri = - if (rowList == null || rowList.isEmpty || rowList.get(0).isEmpty) null else nextUri + val updatedNextUri = queryStatus.state match { + case FINISHED if rowList == null || rowList.isEmpty || rowList.get(0).isEmpty => null + case _ => nextUri + } new QueryResults( queryId, queryHtmlUri, nextUri, - nextUri, + updatedNextUri, columnList, rowList, StatementStats.builder.setState(queryStatus.state.name()).setQueued(false) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoScalaObjectMapper.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoScalaObjectMapper.scala index 2ff8406f621..f6055927ac2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoScalaObjectMapper.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoScalaObjectMapper.scala @@ -26,15 +26,6 @@ class TrinoScalaObjectMapper extends ContextResolver[ObjectMapper] { private lazy val mapper = new ObjectMapper() .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) -// .disable(MapperFeature.AUTO_DETECT_CREATORS) -// .disable(MapperFeature.AUTO_DETECT_FIELDS) -// .disable(MapperFeature.AUTO_DETECT_SETTERS) -// .disable(MapperFeature.AUTO_DETECT_GETTERS) -// .disable(MapperFeature.AUTO_DETECT_IS_GETTERS) -// .disable(MapperFeature.USE_GETTERS_AS_SETTERS) -// .disable(MapperFeature.CAN_OVERRIDE_ACCESS_MODIFIERS) -// .disable(MapperFeature.INFER_PROPERTY_MUTATORS) -// .disable(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS) .registerModule(new Jdk8Module) override def getContext(aClass: Class[_]): ObjectMapper = mapper diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala index d8cdf8dfdbb..ab783f8acce 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala @@ -32,7 +32,7 @@ import io.swagger.v3.oas.annotations.tags.Tag import io.trino.client.QueryResults import org.apache.kyuubi.Logging -import org.apache.kyuubi.server.trino.api.{ApiRequestContext, KyuubiTrinoOperationTranslator, Query, QueryId, QueryManager, Slug, TrinoContext} +import org.apache.kyuubi.server.trino.api.{ApiRequestContext, KyuubiTrinoOperationTranslator, Query, QueryId, Slug, TrinoContext} import org.apache.kyuubi.server.trino.api.Slug.Context.{EXECUTING_QUERY, QUEUED_QUERY} import org.apache.kyuubi.server.trino.api.v1.dto.Ok import org.apache.kyuubi.service.BackendService @@ -42,7 +42,6 @@ import org.apache.kyuubi.service.BackendService private[v1] class StatementResource extends ApiRequestContext with Logging { lazy val translator = new KyuubiTrinoOperationTranslator(fe.be) - lazy val queryManager = new QueryManager() @ApiResponse( responseCode = "200", @@ -117,7 +116,6 @@ private[v1] class StatementResource extends ApiRequestContext with Logging { case NonFatal(e) => val errorMsg = s"Error executing for query id $queryId" - e.printStackTrace() error(errorMsg, e) throw badRequest(NOT_FOUND, "Query not found") }.get @@ -150,7 +148,6 @@ private[v1] class StatementResource extends ApiRequestContext with Logging { case NonFatal(e) => val errorMsg = s"Error executing for query id $queryId" - e.printStackTrace() error(errorMsg, e) throw badRequest(NOT_FOUND, "Query not found") }.get @@ -225,7 +222,7 @@ private[v1] class StatementResource extends ApiRequestContext with Logging { } private def badRequest(status: Response.Status, message: String) = - throw new WebApplicationException( + new WebApplicationException( Response.status(status) .`type`(TEXT_PLAIN_TYPE) .entity(message) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala index 50fa7d9ef47..c88b5c9409f 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala @@ -54,7 +54,7 @@ class TrinoClientApiSuite extends KyuubiFunSuite with TrinoRestFrontendTestHelpe val result1 = execute(trino1) val sessionId1 = trino1.getSetSessionProperties.asScala.get("sessionId") assert(result1 == List(List(2))) - assert(sessionId == sessionId1) + assert(sessionId != sessionId1) trino.close() } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala index 8d7b2bf2ccf..87c8eda968a 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala @@ -85,7 +85,7 @@ class TrinoContextSuite extends KyuubiFunSuite with RestFrontendTestHelper { val metadataResp = fe.be.getResultSetMetadata(opHandle) val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false) - val status = fe.be.getOperationStatus(opHandle) + val status = fe.be.getOperationStatus(opHandle, Some(0)) val uri = new URI("sfdsfsdfdsf") val results = TrinoContext @@ -112,7 +112,7 @@ class TrinoContextSuite extends KyuubiFunSuite with RestFrontendTestHelper { val metadataResp = fe.be.getResultSetMetadata(opHandle) val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false) - val status = fe.be.getOperationStatus(opHandle) + val status = fe.be.getOperationStatus(opHandle, Some(0)) val uri = new URI("sfdsfsdfdsf") val results = TrinoContext diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/v1/StatementResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/v1/StatementResourceSuite.scala index 207f9e3b0b2..adbf389c931 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/v1/StatementResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/v1/StatementResourceSuite.scala @@ -22,16 +22,22 @@ import javax.ws.rs.core.{MediaType, Response} import scala.collection.JavaConverters._ +import io.trino.client.{QueryError, QueryResults} import io.trino.client.ProtocolHeaders.TRINO_HEADERS -import io.trino.client.QueryResults -import org.apache.kyuubi.{KyuubiSQLException, TrinoRestFrontendTestHelper} +import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, TrinoRestFrontendTestHelper} import org.apache.kyuubi.operation.{OperationHandle, OperationState} import org.apache.kyuubi.server.trino.api.TrinoContext import org.apache.kyuubi.server.trino.api.v1.dto.Ok import org.apache.kyuubi.session.SessionHandle -class StatementResourceSuite extends TrinoRestFrontendTestHelper { +class StatementResourceSuite extends KyuubiFunSuite with TrinoRestFrontendTestHelper { + + case class TrinoResponse( + response: Option[Response] = None, + queryError: Option[QueryError] = None, + data: List[List[Any]] = List[List[Any]](), + isEnd: Boolean = false) test("statement test") { val response = webTarget.path("v1/statement/test").request().get() @@ -39,10 +45,28 @@ class StatementResourceSuite extends TrinoRestFrontendTestHelper { assert(result == new Ok("trino server is running")) } + test("statement submit for query error") { + + val response = webTarget.path("v1/statement") + .request().post(Entity.entity("select a", MediaType.TEXT_PLAIN_TYPE)) + + val trinoResponseIter = Iterator.iterate(TrinoResponse(response = Option(response)))(getData) + val isErr = trinoResponseIter.takeWhile(_.isEnd == false).exists { t => + t.queryError != None && t.response == None + } + assert(isErr == true) + } + test("statement submit and get result") { val response = webTarget.path("v1/statement") .request().post(Entity.entity("select 1", MediaType.TEXT_PLAIN_TYPE)) - checkResult(response) + + val trinoResponseIter = Iterator.iterate(TrinoResponse(response = Option(response)))(getData) + val dataSet = trinoResponseIter + .takeWhile(_.isEnd == false) + .map(_.data) + .flatten.toList + assert(dataSet == List(List(1))) } test("query cancel") { @@ -74,20 +98,21 @@ class StatementResourceSuite extends TrinoRestFrontendTestHelper { } - private def checkResult(response: Response): Unit = { - assert(response.getStatus == 200) - val qr = response.readEntity(classOf[QueryResults]) - if (qr.getData.iterator().hasNext) { - val resultSet = qr.getData.iterator() - assert(resultSet.next.asScala == List(1)) - } - if (qr.getNextUri != null) { - val path = qr.getNextUri.getPath - val headers = response.getHeaders - val nextResponse = webTarget.path(path).request().headers(headers).get() - checkResult(nextResponse) - } - + private def getData(current: TrinoResponse): TrinoResponse = { + current.response.map { response => + assert(response.getStatus == 200) + val qr = response.readEntity(classOf[QueryResults]) + val nextData = Option(qr.getData) + .map(_.asScala.toList.map(_.asScala.toList)) + .getOrElse(List[List[Any]]()) + val nextResponse = Option(qr.getNextUri).map { + uri => + val path = uri.getPath + val headers = response.getHeaders + webTarget.path(path).request().headers(headers).get() + } + TrinoResponse(nextResponse, Option(qr.getError), nextData) + }.getOrElse(TrinoResponse(isEnd = true)) } } diff --git a/pom.xml b/pom.xml index 63af01f2a9f..06fba720d3e 100644 --- a/pom.xml +++ b/pom.xml @@ -184,7 +184,6 @@ 4.1.0 1.7.36 1.33 - 2.12.3