Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6411] Grpc common support #6412

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
dd39efd
fix-#4057 info:
davidyuan1223 Jan 13, 2023
86e4e1c
fix-#4057 info: modify the shellcheck errors file in ./bin
davidyuan1223 Jan 13, 2023
cb11935
Merge remote-tracking branch 'origin/fix-#4057' into fix-#4057
davidyuan1223 Jan 13, 2023
55a0a43
Merge pull request #10 from xiaoyuandajian/fix-#4057
davidyuan1223 Jan 13, 2023
c48ad38
remove the used blank lines
davidyuan1223 Mar 5, 2023
16237c2
Merge branch 'apache:master' into master
davidyuan1223 Apr 2, 2023
bfa6cbf
Merge branch 'apache:master' into master
davidyuan1223 Sep 22, 2023
e244029
Merge branch 'apache:master' into master
davidyuan1223 Sep 26, 2023
b616044
fix_4186
davidyuan1223 Sep 26, 2023
360d183
Merge branch 'master' into fix_4186
davidyuan1223 Sep 26, 2023
c83836b
Merge pull request #11 from davidyuan1223/fix_4186
davidyuan1223 Sep 26, 2023
40e80d9
Revert "fix_4186"
davidyuan1223 Sep 27, 2023
0925a4b
Merge pull request #12 from davidyuan1223/revert-11-fix_4186
davidyuan1223 Sep 27, 2023
2beccb6
Merge branch 'apache:master' into master
davidyuan1223 Sep 28, 2023
c8eb9a2
Merge branch 'apache:master' into master
davidyuan1223 Oct 10, 2023
56b91a3
fix_4186
davidyuan1223 Oct 10, 2023
57ec746
Merge pull request #13 from davidyuan1223/fix
davidyuan1223 Oct 10, 2023
72e7aea
Merge branch 'apache:master' into master
davidyuan1223 Oct 20, 2023
bcb0cf3
Merge remote-tracking branch 'origin/master'
davidyuan1223 Oct 26, 2023
8b51840
add common method to get session level config
davidyuan1223 Oct 26, 2023
6376466
Merge branch 'apache:master' into master
davidyuan1223 Dec 23, 2023
46001d4
Merge branch 'apache:master' into master
davidyuan1223 Jan 26, 2024
3d6c53b
add new module common-grpc
davidyuan1223 May 23, 2024
e16fd7f
fix_4186
davidyuan1223 Oct 10, 2023
a3a4ebd
add common method to get session level config
davidyuan1223 Oct 26, 2023
631ac9a
add new module common-grpc
davidyuan1223 May 23, 2024
0a52c2a
fork master new version
davidyuan1223 May 23, 2024
d92fdb6
Merge remote-tracking branch 'origin/grpc-common-support' into grpc-c…
davidyuan1223 May 23, 2024
4c5a4ae
fix format error
davidyuan1223 May 23, 2024
3e302aa
fix format error
davidyuan1223 May 23, 2024
c6af8ed
update settings.md
davidyuan1223 May 23, 2024
a1e0447
update
davidyuan1223 May 28, 2024
5bf1814
update
davidyuan1223 Jun 4, 2024
e031c8c
update
davidyuan1223 Jun 4, 2024
55cbae3
update
davidyuan1223 Jul 10, 2024
0c17121
update
davidyuan1223 Jul 15, 2024
2b959f9
add new module kyuubi-grpc
davidyuan1223 Jul 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add new module common-grpc
  • Loading branch information
davidyuan1223 committed May 23, 2024
commit 631ac9a7aa9a12bf5b977b03c1250e4981fb2eca
Original file line number Diff line number Diff line change
Expand Up @@ -2731,6 +2731,28 @@ object KyuubiConf {
.stringConf
.createWithDefault("ENGINE")

val ENGINE_SPARK_CONNECT_GRPC_BINDING_PORT: ConfigEntry[Int] =
buildConf("kyuubi.engine.spark.connect.grpc.bind.port")
.doc("The port is used in spark connect frontendService start GrpcServer")
.version("1.9.0")
.intConf
.createWithDefault(15002)

val ENGINE_SPARK_CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.engine.spark.connect.grpc.max.inbound.message.size")
.doc("Sets the maximum inbound message in bytes size for the gRPC requests." +
"Requests with a larger payload will fail.")
.version("1.9.0")
.intConf
.createWithDefault(128 * 1024 * 1024)

val ENGINE_SPARK_CONNECT_GRPC_BINDING_HOST: ConfigEntry[Option[String]] =
buildConf("kyuubi.engine.spark.connect.grpc.bind.host")
.doc("Hostname or IP of the machine on which to run the grpc server in frontend service ")
.version("1.9.0")
.serverOnly
.fallbackConf(FRONTEND_BIND_HOST)

val ENGINE_SPARK_SHOW_PROGRESS: ConfigEntry[Boolean] =
buildConf("kyuubi.session.engine.spark.showProgress")
.doc("When true, show the progress bar in the Spark's engine log.")
Expand Down
392 changes: 392 additions & 0 deletions kyuubi-grpc/pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.grpc.events

import org.apache.kyuubi.grpc.operation.GrpcOperation
import org.apache.kyuubi.grpc.session.GrpcSession
import org.apache.kyuubi.grpc.utils.Clock

object OperationEventsManager {
// TODO: make this configurable
val MAX_STATEMENT_TEXT_SIZE = 65535
}

sealed abstract class OperationStatus(value: Int)

object OperationStatus {
case object Pending extends OperationStatus(0)
case object Started extends OperationStatus(1)
case object Analyzed extends OperationStatus(2)
case object ReadyForExecution extends OperationStatus(3)
case object Finished extends OperationStatus(4)
case object Failed extends OperationStatus(5)
case object Canceled extends OperationStatus(6)
case object Closed extends OperationStatus(7)
}
abstract class OperationEventsManager(operation: GrpcOperation, clock: Clock) {
private def operationId: String = operation.operationKey.operationId

private def session: GrpcSession = operation.grpcSession

private def sessionId: String = session.sessionKey.sessionId

private def sessionStatus = session.sessionEventsManager.status

protected var _status: OperationStatus = OperationStatus.Pending

private var error = Option.empty[Boolean]

private var canceled = Option.empty[Boolean]

private var producedRowCount = Option.empty[Long]

private def status: OperationStatus = _status

private def hasCanceled: Option[Boolean] = canceled

private def hasError: Option[Boolean] = error

private def getProduceRowCount: Option[Long] = producedRowCount

def postStarted(): Unit = {
assertStatus(List(OperationStatus.Pending), OperationStatus.Started)
}

def postAnalyzed(analyzedPlan: Option[Any] = None): Unit = {
assertStatus(List(OperationStatus.Started, OperationStatus.Analyzed), OperationStatus.Analyzed)
}

def postReadyForExecution(): Unit = {
assertStatus(List(OperationStatus.Analyzed), OperationStatus.ReadyForExecution)
}

def postCanceled(): Unit = {
assertStatus(
List(
OperationStatus.Started,
OperationStatus.Analyzed,
OperationStatus.ReadyForExecution,
OperationStatus.Finished,
OperationStatus.Failed),
OperationStatus.Canceled)
canceled = Some(true)
}

def postFailed(errorMessage: String): Unit = {
assertStatus(
List(
OperationStatus.Started,
OperationStatus.Analyzed,
OperationStatus.ReadyForExecution,
OperationStatus.Finished),
OperationStatus.Failed)
error = Some(true)
}

def postFinished(producedRowCountOpt: Option[Long] = None): Unit = {
assertStatus(
List(
OperationStatus.Started,
OperationStatus.ReadyForExecution),
OperationStatus.Finished)
producedRowCount = producedRowCountOpt
}

def postClosed(): Unit = {
assertStatus(
List(
OperationStatus.Finished,
OperationStatus.Failed,
OperationStatus.Canceled),
OperationStatus.Closed)
}

def status_(operationStatus: OperationStatus): Unit = {
_status = operationStatus
}

private def assertStatus(
validStatuses: List[OperationStatus],
eventStatus: OperationStatus): Unit = {
if (!validStatuses.contains(status)) {
throw new IllegalStateException(
s"""
|operationId: $operationId with status ${status}
|is not within statuses $validStatuses for event $eventStatus
|""".stripMargin)
}
// if (sessionStatus != SessionStatus.Started) {
// throw new IllegalStateException(
// s"""
// |sessionId: $sessionId with status $sessionStatus
// |is not Started for event $eventStatus
// |""".stripMargin)
// }
_status = eventStatus
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.grpc.events

import org.apache.kyuubi.grpc.session.GrpcSession
import org.apache.kyuubi.grpc.utils.Clock

sealed abstract class SessionStatus(value: Int)

object SessionStatus {
case object Pending extends SessionStatus(0)
case object Started extends SessionStatus(1)
case object Closed extends SessionStatus(2)
}

abstract class SessionEventsManager(session: GrpcSession, clock: Clock) {
private def sessionId: String = session.sessionKey.sessionId

private var _status: SessionStatus = SessionStatus.Pending

protected def status_(sessionStatus: SessionStatus): Unit = {
_status = sessionStatus
}

def status: SessionStatus = _status

def postStarted(): Unit = {
assertStatus(List(SessionStatus.Pending), SessionStatus.Started)
status_(SessionStatus.Started)
}

def postClosed(): Unit = {
assertStatus(List(SessionStatus.Started), SessionStatus.Closed)
status_(SessionStatus.Closed)
}

private def assertStatus(validStatuses: List[SessionStatus], eventStatus: SessionStatus): Unit = {
if (!validStatuses.contains(status)) {
throw new IllegalStateException(
s"""
|sessionId: $sessionId with status ${status}
|is not within statuses $validStatuses for event $eventStatus
|""".stripMargin)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.grpc.operation

import java.util.concurrent.locks.ReentrantLock

import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.grpc.session.GrpcSession

abstract class AbstractGrpcOperation[S <: GrpcSession](session: S) extends GrpcOperation
with Logging {
final protected val opType: String = getClass.getSimpleName
final protected val createTime = System.currentTimeMillis()
protected def key: OperationKey
final private val operationTimeout: Long = 1000
private var lock: ReentrantLock = new ReentrantLock()

protected def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)

@volatile protected var startTime: Long = _
@volatile protected var completedTime: Long = _
@volatile protected var lastAccessTime: Long = createTime

@volatile protected var operationException: KyuubiSQLException = _

protected def setOperationException(ex: KyuubiSQLException): Unit = {
this.operationException = ex
}

protected def runInternal(): Unit

protected def beforeRun(): Unit

protected def afterRun(): Unit

override def run(): Unit = {
beforeRun()
try {
runInternal()
} finally {
afterRun()
}
}

override def close(): Unit

override def operationKey: OperationKey = key

override def grpcSession: S = session

}

object OperationJobTag {
def apply(prefix: String, operationKey: OperationKey): String = {
s"${prefix}_" +
s"User_${operationKey.userId}_" +
s"Session_${operationKey.sessionId}_" +
s"Operation_${operationKey.operationId}"
}

def unapply(jobTag: String, prefix: String): Option[String] = {
if (jobTag.startsWith(prefix)) Some(jobTag) else None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.grpc.operation

import org.apache.kyuubi.grpc.events.OperationEventsManager
import org.apache.kyuubi.grpc.session.GrpcSession
import org.apache.kyuubi.operation.log.OperationLog

trait GrpcOperation {
def run(): Unit
def interrupt(): Unit
def close(): Unit

def getOperationLog: Option[OperationLog]
def isTimedOut: Boolean
def grpcSession: GrpcSession
def operationKey: OperationKey
def operationEventsManager: OperationEventsManager
}
Loading