Skip to content

[SPARK-14449][SQL] SparkContext should use SparkListenerInterface #12227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Register a listener to receive up-calls from events that happen during execution.
*/
@DeveloperApi
def addSparkListener(listener: SparkListener) {
def addSparkListener(listener: SparkListenerInterface) {
listenerBus.addListener(listener)
}

Expand Down Expand Up @@ -2007,15 +2007,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use reflection to find the right constructor
val constructors = {
val listenerClass = Utils.classForName(className)
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
listenerClass
.getConstructors
.asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]]
}
val constructorTakingSparkConf = constructors.find { c =>
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
}
lazy val zeroArgumentConstructor = constructors.find { c =>
c.getParameterTypes.isEmpty
}
val listener: SparkListener = {
val listener: SparkListenerInterface = {
if (constructorTakingSparkConf.isDefined) {
constructorTakingSparkConf.get.newInstance(conf)
} else if (zeroArgumentConstructor.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import org.apache.spark.util.ListenerBus
/**
* A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
*/
private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
private[spark] trait SparkListenerBus
extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {

protected override def doPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._

import org.scalatest.Matchers

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}

Expand Down Expand Up @@ -377,13 +377,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}

test("registering listeners via spark.extraListeners") {
val listeners = Seq(
classOf[ListenerThatAcceptsSparkConf],
classOf[FirehoseListenerThatAcceptsSparkConf],
classOf[BasicJobCounter])
val conf = new SparkConf().setMaster("local").setAppName("test")
.set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," +
classOf[BasicJobCounter].getName)
.set("spark.extraListeners", listeners.map(_.getName).mkString(","))
sc = new SparkContext(conf)
sc.listenerBus.listeners.asScala.count(_.isInstanceOf[BasicJobCounter]) should be (1)
sc.listenerBus.listeners.asScala
.count(_.isInstanceOf[ListenerThatAcceptsSparkConf]) should be (1)
sc.listenerBus.listeners.asScala
.count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1)
}

/**
Expand Down Expand Up @@ -476,3 +481,11 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene
var count = 0
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}

private class FirehoseListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkFirehoseListener {
var count = 0
override def onEvent(event: SparkListenerEvent): Unit = event match {
case job: SparkListenerJobEnd => count += 1
case _ =>
}
}
1 change: 1 addition & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory"),
// SPARK-14358 SparkListener from trait to abstract class
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.SparkContext.addSparkListener"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.JavaSparkListener"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.SparkFirehoseListener"),
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.scheduler.SparkListener"),
Expand Down