Skip to content

Commit a3d05e0

Browse files
committed
revert perivous change and fix bug in other way
1. revert perivous change 2. add batchTimesWithNoJob Set to record the batch with no job 3. add aggrate method of InputInfo
1 parent 83351b8 commit a3d05e0

File tree

3 files changed

+42
-13
lines changed

3 files changed

+42
-13
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ case class StreamInputInfo(
3939

4040
def metadataDescription: Option[String] =
4141
metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString)
42+
43+
def merge(other: StreamInputInfo): StreamInputInfo = {
44+
require(other.inputStreamId == inputStreamId,
45+
"Can't merge two StreamInputInfo with different id")
46+
StreamInputInfo(inputStreamId, numRecords + other.numRecords, metadata ++ other.metadata)
47+
}
4248
}
4349

4450
@DeveloperApi
@@ -79,6 +85,28 @@ private[streaming] class InputInfoTracker(ssc: StreamingContext) extends Logging
7985
inputInfos.map(_.toMap).getOrElse(Map[Int, StreamInputInfo]())
8086
}
8187

88+
/**
89+
* Get the all the input stream's information of all specified batch times and
90+
* merge results together.
91+
*/
92+
def getInfo(batchTimes: Iterable[Time]): Map[Int, StreamInputInfo] = synchronized {
93+
val inputInfosSet = batchTimes.map{ batchTime =>
94+
val inputInfos = batchTimeToInputInfos.get(batchTime)
95+
inputInfos.getOrElse(mutable.Map[Int, StreamInputInfo]())
96+
}
97+
98+
val aggregatedInputInfos = mutable.Map[Int, StreamInputInfo]()
99+
inputInfosSet.foreach(inputInfos => inputInfos.foreach { case (id, info) =>
100+
val currentInfo = aggregatedInputInfos.get(id)
101+
if (currentInfo.isEmpty) {
102+
aggregatedInputInfos(id) = info
103+
} else {
104+
aggregatedInputInfos(id) = currentInfo.get.merge(info)
105+
}
106+
})
107+
aggregatedInputInfos.toMap
108+
}
109+
82110
/** Cleanup the tracked input information older than threshold batch time */
83111
def cleanup(batchThreshTime: Time): Unit = synchronized {
84112
val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.streaming.scheduler
1919

20+
import scala.collection.mutable
2021
import scala.util.{Failure, Success, Try}
2122

2223
import org.apache.spark.SparkEnv
@@ -77,6 +78,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
7778
// last batch whose completion,checkpointing and metadata cleanup has been completed
7879
private var lastProcessedBatch: Time = null
7980

81+
// On some batch time, a JobSet with no jobs will be submit. We record such batch time here in
82+
// order to correct the input info of later jobSet with jobs.
83+
private var batchTimesWithNoJob: mutable.HashSet[Time] = mutable.HashSet[Time]()
84+
8085
/** Start generation of jobs */
8186
def start(): Unit = synchronized {
8287
if (eventLoop != null) return // generator has already been started
@@ -249,7 +254,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
249254
graph.generateJobs(time) // generate jobs using allocated block
250255
} match {
251256
case Success(jobs) =>
252-
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
257+
val streamIdToInputInfos = if (jobs.isEmpty) {
258+
batchTimesWithNoJob.add(time)
259+
Map.empty[Int, StreamInputInfo]
260+
} else {
261+
batchTimesWithNoJob.add(time)
262+
val inputInfo = jobScheduler.inputInfoTracker.getInfo(batchTimesWithNoJob)
263+
batchTimesWithNoJob.clear()
264+
inputInfo
265+
}
253266
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
254267
case Failure(e) =>
255268
jobScheduler.reportError("Error generating jobs for time " + time, e)

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.util.Properties
2121
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2222

2323
import scala.collection.JavaConverters._
24-
import scala.collection.mutable.HashSet
2524
import scala.util.Failure
2625

2726
import org.apache.commons.lang.SerializationUtils
@@ -65,8 +64,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
6564

6665
private var eventLoop: EventLoop[JobSchedulerEvent] = null
6766

68-
private val inputInfoMissedTimes = HashSet[Time]()
69-
7067
def start(): Unit = synchronized {
7168
if (eventLoop != null) return // scheduler has already been started
7269

@@ -142,7 +139,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
142139
def submitJobSet(jobSet: JobSet) {
143140
if (jobSet.jobs.isEmpty) {
144141
logInfo("No jobs added for time " + jobSet.time)
145-
inputInfoMissedTimes.add(jobSet.time)
146142
} else {
147143
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
148144
jobSets.put(jobSet.time, jobSet)
@@ -197,14 +193,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
197193
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
198194
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
199195
if (jobSet.hasCompleted) {
200-
// submit fake BatchCompleted event to show missing inputInfo on Streaming UI
201-
inputInfoMissedTimes.foreach (time => {
202-
val streamIdToInputInfos = inputInfoTracker.getInfo(time)
203-
val fakeJobSet = JobSet(time, Seq(), streamIdToInputInfos)
204-
listenerBus.post(StreamingListenerBatchCompleted(fakeJobSet.toBatchInfo))
205-
})
206-
inputInfoMissedTimes.clear()
207-
208196
jobSets.remove(jobSet.time)
209197
jobGenerator.onBatchCompletion(jobSet.time)
210198
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(

0 commit comments

Comments
 (0)