@@ -19,13 +19,14 @@ package org.apache.spark.scheduler
1919
2020import java .util .Properties
2121import java .util .concurrent .{CountDownLatch , TimeUnit }
22- import java .util .concurrent .atomic .{AtomicBoolean , AtomicLong }
22+ import java .util .concurrent .atomic .{AtomicBoolean , AtomicInteger , AtomicLong , AtomicReference }
2323
2424import scala .annotation .meta .param
2525import scala .collection .mutable .{ArrayBuffer , HashMap , HashSet , Map }
2626import scala .util .control .NonFatal
2727
2828import org .scalatest .concurrent .{Signaler , ThreadSignaler , TimeLimits }
29+ import org .scalatest .exceptions .TestFailedException
2930import org .scalatest .time .SpanSugar ._
3031
3132import org .apache .spark ._
@@ -36,7 +37,7 @@ import org.apache.spark.rdd.{DeterministicLevel, RDD}
3637import org .apache .spark .scheduler .SchedulingMode .SchedulingMode
3738import org .apache .spark .shuffle .{FetchFailedException , MetadataFetchFailedException }
3839import org .apache .spark .storage .{BlockId , BlockManagerId , BlockManagerMaster }
39- import org .apache .spark .util .{AccumulatorContext , AccumulatorV2 , CallSite , LongAccumulator , Utils }
40+ import org .apache .spark .util .{AccumulatorContext , AccumulatorV2 , CallSite , LongAccumulator , ThreadUtils , Utils }
4041
4142class DAGSchedulerEventProcessLoopTester (dagScheduler : DAGScheduler )
4243 extends DAGSchedulerEventProcessLoop (dagScheduler) {
@@ -788,6 +789,36 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
788789 }
789790 }
790791
792+ test(" SPARK-28967 properties must be cloned before posting to listener bus for 0 partition" ) {
793+ val properties = new Properties ()
794+ val func = (context : TaskContext , it : Iterator [(_)]) => 1
795+ val resultHandler = (taskIndex : Int , result : Int ) => {}
796+ val assertionError = new AtomicReference [TestFailedException ](
797+ new TestFailedException (" Listener didn't receive expected JobStart event" , 0 ))
798+ val listener = new SparkListener () {
799+ override def onJobStart (event : SparkListenerJobStart ): Unit = {
800+ try {
801+ assert(event.properties.equals(properties), " Expected same content of properties, " +
802+ s " but got properties with different content. props in caller ${properties} / " +
803+ s " props in event ${event.properties}" )
804+ assert(event.properties.ne(properties), " Expected instance with different identity, " +
805+ " but got same instance." )
806+ assertionError.set(null )
807+ } catch {
808+ case e : TestFailedException => assertionError.set(e)
809+ }
810+ }
811+ }
812+ sc.addSparkListener(listener)
813+
814+ // 0 partition
815+ val testRdd = new MyRDD (sc, 0 , Nil )
816+ val waiter = scheduler.submitJob(testRdd, func, Seq .empty, CallSite .empty,
817+ resultHandler, properties)
818+ sc.listenerBus.waitUntilEmpty(1000L )
819+ assert(assertionError.get() === null )
820+ }
821+
791822 // Helper function to validate state when creating tests for task failures
792823 private def checkStageId (stageId : Int , attempt : Int , stageAttempt : TaskSet ) {
793824 assert(stageAttempt.stageId === stageId)
0 commit comments