Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import java.io._
import java.net.URI
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec}
import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils}
Expand Down Expand Up @@ -62,7 +64,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp

val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)
val eventMonster = new EventMonster(conf)
val eventMonster = new EventBufferingListener
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
Expand Down Expand Up @@ -108,7 +110,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val replayer = new ReplayListenerBus()

val eventMonster = new EventMonster(conf)
val eventMonster = new EventBufferingListener
replayer.addListener(eventMonster)

// Verify the replay returns the events given the input maybe truncated.
Expand Down Expand Up @@ -145,7 +147,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp

val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)
val eventMonster = new EventMonster(conf)
val eventMonster = new EventBufferingListener
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
Expand Down Expand Up @@ -207,7 +209,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp

// Replay events
val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
val eventMonster = new EventMonster(conf)
val eventMonster = new EventBufferingListener
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
Expand All @@ -219,11 +221,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
// Verify the same events are replayed in the same order
assert(sc.eventLogger.isDefined)
val originalEvents = sc.eventLogger.get.loggedEvents
.map(JsonProtocol.sparkEventFromJson(_))
val replayedEvents = eventMonster.loggedEvents
.map(JsonProtocol.sparkEventFromJson(_))
originalEvents.zip(replayedEvents).foreach { case (e1, e2) =>
// Don't compare the JSON here because accumulators in StageInfo may be out of order
JsonProtocolSuite.assertEquals(
JsonProtocol.sparkEventFromJson(e1), JsonProtocol.sparkEventFromJson(e2))
JsonProtocolSuite.assertEquals(e1, e1)
}
}

Expand All @@ -235,21 +237,15 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp

/**
* A simple listener that buffers all the events it receives.
*
* The event buffering functionality must be implemented within EventLoggingListener itself.
* This is because of the following race condition: the event may be mutated between being
* processed by one listener and being processed by another. Thus, in order to establish
* a fair comparison between the original events and the replayed events, both functionalities
* must be implemented within one listener (i.e. the EventLoggingListener).
*
* This child listener inherits only the event buffering functionality, but does not actually
* log the events.
*/
private class EventMonster(conf: SparkConf)
extends EventLoggingListener("test", None, new URI("testdir"), conf) {
private class EventBufferingListener extends SparkFirehoseListener {

override def start() { }
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

override def onEvent(event: SparkListenerEvent) {
val eventJson = JsonProtocol.sparkEventToJson(event)
loggedEvents += eventJson
}
}

/*
Expand Down