From e00afd31a72350f388c4473704d5dd58f79ffa29 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Mar 2021 07:38:43 +0000 Subject: [PATCH] [SPARK-34087][FOLLOW-UP][SQL] Manage ExecutionListenerBus register inside itself ### What changes were proposed in this pull request? Move `ExecutionListenerBus` register (both `ListenerBus` and `ContextCleaner` register) into itself. Also with a minor change that put `registerSparkListenerForCleanup` to a better place. ### Why are the changes needed? improve code ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass existing tests. Closes #31919 from Ngone51/SPARK-34087-followup. Authored-by: yi.wu Signed-off-by: Wenchen Fan --- .../scala/org/apache/spark/ContextCleaner.scala | 10 +++++----- .../spark/sql/util/QueryExecutionListener.scala | 16 ++++++++++------ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 34b3089107efe..091b5e1600d9e 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -172,11 +172,6 @@ private[spark] class ContextCleaner( registerForCleanup(rdd, CleanCheckpoint(parentId)) } - /** Register an object for cleanup. */ - private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = { - referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)) - } - /** Register a SparkListener to be cleaned up when its owner is garbage collected. */ def registerSparkListenerForCleanup( listenerOwner: AnyRef, @@ -184,6 +179,11 @@ private[spark] class ContextCleaner( registerForCleanup(listenerOwner, CleanSparkListener(listener)) } + /** Register an object for cleanup. */ + private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = { + referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)) + } + /** Keep cleaning RDD, shuffle, and broadcast state. */ private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { while (!stopped) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index d8b630d5354e0..b1742078cc125 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -76,11 +76,7 @@ trait QueryExecutionListener { class ExecutionListenerManager private[sql](session: SparkSession, loadExtensions: Boolean) extends Logging { - private val listenerBus = new ExecutionListenerBus(session.sessionUUID) - session.sparkContext.listenerBus.addToSharedQueue(listenerBus) - session.sparkContext.cleaner.foreach { cleaner => - cleaner.registerSparkListenerForCleanup(this, listenerBus) - } + private val listenerBus = new ExecutionListenerBus(this, session) if (loadExtensions) { val conf = session.sparkContext.conf @@ -128,9 +124,17 @@ class ExecutionListenerManager private[sql](session: SparkSession, loadExtension } } -private[sql] class ExecutionListenerBus(sessionUUID: String) +private[sql] class ExecutionListenerBus private(sessionUUID: String) extends SparkListener with ListenerBus[QueryExecutionListener, SparkListenerSQLExecutionEnd] { + def this(manager: ExecutionListenerManager, session: SparkSession) = { + this(session.sessionUUID) + session.sparkContext.listenerBus.addToSharedQueue(this) + session.sparkContext.cleaner.foreach { cleaner => + cleaner.registerSparkListenerForCleanup(manager, this) + } + } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case e: SparkListenerSQLExecutionEnd => postToAll(e) case _ =>