diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index 677af9a0394..79f38ce35a4 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.session import java.util.concurrent.{ScheduledExecutorService, TimeUnit} import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.apache.spark.api.python.KyuubiPythonGatewayServer import org.apache.spark.sql.SparkSession import org.apache.kyuubi.KyuubiSQLException @@ -94,6 +95,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) override def stop(): Unit = { super.stop() + KyuubiPythonGatewayServer.shutdown() userIsolatedSparkSessionThread.foreach(_.shutdown()) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala index 7e15ffe05a6..8cf8d685c86 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala @@ -30,10 +30,12 @@ object KyuubiPythonGatewayServer extends Logging { val CONNECTION_FILE_PATH = Utils.createTempDir() + "/connection.info" - def start(): Unit = { + private var gatewayServer: Py4JServer = _ + + def start(): Unit = synchronized { val sparkConf = new SparkConf() - val gatewayServer: Py4JServer = new Py4JServer(sparkConf) + gatewayServer = new Py4JServer(sparkConf) gatewayServer.start() val boundPort: Int = gatewayServer.getListeningPort @@ -65,4 +67,11 @@ object KyuubiPythonGatewayServer extends Logging { System.exit(1) } } + + def shutdown(): Unit = synchronized { + if (gatewayServer != null) { + logInfo("shutting down the python gateway server.") + gatewayServer.shutdown() + } + } }