Skip to content

Commit 8092a4d

Browse files
committed
revert spurious test change; this came from another PR
1 parent a7c5181 commit 8092a4d

File tree

1 file changed

+1
-25
lines changed

1 file changed

+1
-25
lines changed

sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.{File, InterruptedIOException, IOException, UncheckedIOException}
2121
import java.nio.channels.ClosedByInterruptException
2222
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit}
2323

24-
import scala.collection.mutable
2524
import scala.reflect.ClassTag
2625
import scala.util.control.ControlThrowable
2726

@@ -30,7 +29,7 @@ import org.apache.commons.io.FileUtils
3029
import org.apache.hadoop.conf.Configuration
3130

3231
import org.apache.spark.{SparkContext, SparkEnv}
33-
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
32+
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
3433
import org.apache.spark.sql._
3534
import org.apache.spark.sql.catalyst.plans.logical.Range
3635
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
@@ -246,29 +245,6 @@ class ContinuousSuite extends ContinuousSuiteBase {
246245
class ContinuousStressSuite extends ContinuousSuiteBase {
247246
import testImplicits._
248247

249-
// Continuous processing tasks end asynchronously, so test that they actually end.
250-
private val tasksEndedListener = new SparkListener() {
251-
val activeTaskIds = mutable.Set[Long]()
252-
253-
override def onTaskStart(start: SparkListenerTaskStart): Unit = {
254-
activeTaskIds.add(start.taskInfo.taskId)
255-
}
256-
257-
override def onTaskEnd(end: SparkListenerTaskEnd): Unit = {
258-
activeTaskIds.remove(end.taskInfo.taskId)
259-
}
260-
}
261-
override def beforeEach(): Unit = {
262-
spark.sparkContext.addSparkListener(tasksEndedListener)
263-
}
264-
265-
override def afterEach(): Unit = {
266-
eventually(timeout(streamingTimeout)) {
267-
assert(tasksEndedListener.activeTaskIds.isEmpty)
268-
}
269-
spark.sparkContext.removeSparkListener(tasksEndedListener)
270-
}
271-
272248
test("only one epoch") {
273249
val df = spark.readStream
274250
.format("rate")

0 commit comments

Comments
 (0)