From 8056235ec150f24a539c831973e7f36fc8bb6c0b Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Thu, 19 Sep 2024 11:02:20 +0800 Subject: [PATCH] [KYUUBI #6696] Fix Trino Status Printer to Prevent Thread Leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— ## Describe Your Solution ๐Ÿ”ง - use `newDaemonSingleThreadScheduledExecutor` avoid `timer` thread leak - reduce same status info out ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6696 from lsm1/branch-fix-trino-printer. Closes #6696 01f917cb7 [senmiaoliu] fix style 0d20fd1f9 [senmiaoliu] fix trino info printer thread leak Authored-by: senmiaoliu Signed-off-by: senmiaoliu --- .../kyuubi/engine/trino/TrinoStatement.scala | 29 ++++++++++++------- .../engine/trino/TrinoStatusPrinter.scala | 12 ++++++-- .../trino/operation/ExecuteStatement.scala | 1 + 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala index 51deee84a35..b8d0bba3165 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala @@ -17,8 +17,8 @@ package org.apache.kyuubi.engine.trino -import java.util.{Timer, TimerTask} -import java.util.concurrent.Executors +import java.util.OptionalDouble +import java.util.concurrent.{Executors, TimeUnit} import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -38,6 +38,7 @@ import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_DEBUG import org.apache.kyuubi.engine.trino.TrinoConf.DATA_PROCESSING_POOL_SIZE import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.ThreadUtils /** * Trino client communicate with trino cluster. @@ -58,7 +59,9 @@ class TrinoStatement( private lazy val showProcess = kyuubiConf.get(ENGINE_TRINO_SHOW_PROGRESS) private lazy val showDebug = kyuubiConf.get(ENGINE_TRINO_SHOW_PROGRESS_DEBUG) - private lazy val timer = new Timer("refresh status info", true) + private val timer = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("Trino-Status-Printer", false) + private var lastStats: OptionalDouble = OptionalDouble.empty() implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(dataProcessingPoolSize)) @@ -105,7 +108,7 @@ class TrinoStatement( getData() } } else { - timer.cancel() + timer.shutdown() Verify.verify(trino.isFinished) if (operationLog.isDefined && showProcess) { TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, showDebug) @@ -153,18 +156,22 @@ class TrinoStatement( } def printStatusInfo(): Unit = { if (operationLog.isDefined && showProcess) { - timer.schedule( - new TimerTask { - override def run(): Unit = { - if (trino.isRunning) { - TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, showDebug) - } + timer.scheduleWithFixedDelay( + () => { + if (trino.isRunning) { + lastStats = + TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, showDebug, lastStats) } }, 500L, - kyuubiConf.get(KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_UPDATE_INTERVAL)) + kyuubiConf.get(KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_UPDATE_INTERVAL), + TimeUnit.MILLISECONDS) } } + + def stopPrinter(): Unit = { + timer.shutdown() + } } object TrinoStatement { diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala index 2654f54133b..70a4d88aa52 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.engine.trino +import java.util.OptionalDouble import java.util.concurrent.TimeUnit._ import io.airlift.units.DataSize @@ -35,7 +36,8 @@ object TrinoStatusPrinter { def printStatusInfo( client: StatementClient, operationLog: OperationLog, - debug: Boolean = false): Unit = { + debug: Boolean = false, + lastStats: OptionalDouble = null): OptionalDouble = { val out = new TrinoConsoleProgressBar(operationLog) val results = if (client.isRunning) { @@ -46,11 +48,16 @@ object TrinoStatusPrinter { val stats = results.getStats + if (lastStats != null && + stats.getProgressPercentage.equals(lastStats)) { + return lastStats + } + val wallTime = Duration.succinctDuration(stats.getElapsedTimeMillis(), MILLISECONDS) val nodes = stats.getNodes if ((nodes == 0) || (stats.getTotalSplits == 0)) { - return + return stats.getProgressPercentage } // Query 12, FINISHED, 1 node @@ -122,6 +129,7 @@ object TrinoStatusPrinter { s"[${formatCountRate(stats.getProcessedRows(), wallTime, false)} rows/s, " + s"${formatDataRate(DataSize.ofBytes(stats.getProcessedBytes()), wallTime, true)}]" out.printLine(statsLine) + stats.getProgressPercentage } def percentage(count: Double, total: Double): Int = { diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala index 0ba2297c394..250b8d64b1e 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala @@ -127,6 +127,7 @@ class ExecuteStatement( } catch { onError(cancel = true) } finally { + trinoStatement.stopPrinter() shutdownTimeoutMonitor() } }