Skip to content

Commit

Permalink
[KYUUBI #4644] Manually terminate the Py4JServer during engine shutdown
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

Due to the Py4JServer initiating with a non-daemon thread, there is a possibility of it impeding the engine's termination. Therefore, it is imperative to manually terminate the Py4JServer during engine shutdown.

```
"Thread-23" #96 prio=5 os_prio=0 cpu=7.93ms elapsed=187532.67s tid=0x00007fee840cf000 nid=0x8f runnable  [0x00007fedca6bf000]
   java.lang.Thread.State: RUNNABLE
	at java.net.PlainSocketImpl.socketAccept(java.base11.0.16/Native Method)
	at java.net.AbstractPlainSocketImpl.accept(java.base11.0.16/Unknown Source)
	at java.net.ServerSocket.implAccept(java.base11.0.16/Unknown Source)
	at java.net.ServerSocket.accept(java.base11.0.16/Unknown Source)
	at py4j.GatewayServer.run(GatewayServer.java:685)
	at java.lang.Thread.run(java.base11.0.16/Unknown Source)
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4644 from cfmcgrady/pyserver-non-daemon.

Closes #4644

d4f1a57 [Fu Chen] synchronized
cdc9630 [Fu Chen] shutdown Py4JServer

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
cfmcgrady authored and pan3793 committed Mar 31, 2023
1 parent 4e9e647 commit 726a831
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.session
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.api.python.KyuubiPythonGatewayServer
import org.apache.spark.sql.SparkSession

import org.apache.kyuubi.KyuubiSQLException
Expand Down Expand Up @@ -94,6 +95,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)

override def stop(): Unit = {
super.stop()
KyuubiPythonGatewayServer.shutdown()
userIsolatedSparkSessionThread.foreach(_.shutdown())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ object KyuubiPythonGatewayServer extends Logging {

val CONNECTION_FILE_PATH = Utils.createTempDir() + "/connection.info"

def start(): Unit = {
private var gatewayServer: Py4JServer = _

def start(): Unit = synchronized {

val sparkConf = new SparkConf()
val gatewayServer: Py4JServer = new Py4JServer(sparkConf)
gatewayServer = new Py4JServer(sparkConf)

gatewayServer.start()
val boundPort: Int = gatewayServer.getListeningPort
Expand Down Expand Up @@ -65,4 +67,11 @@ object KyuubiPythonGatewayServer extends Logging {
System.exit(1)
}
}

def shutdown(): Unit = synchronized {
if (gatewayServer != null) {
logInfo("shutting down the python gateway server.")
gatewayServer.shutdown()
}
}
}

0 comments on commit 726a831

Please sign in to comment.