Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,17 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
private val streamingListener = ssc.progressListener

private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
defaultValue: T) {
defaultValue: T): Unit = {
registerGaugeWithOption[T](name,
(l: StreamingJobProgressListener) => Option(f(streamingListener)), defaultValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to keep the Option here (and document that defaultValue is used when f returns null. And other places should not have to use Option. This is safer for any one to use and also minimizes the changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will revert it back and try a better way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi TD, what's your meaning of "And other places should not have to use Option", If here as an example, change to

registerGauge("lastCompletedBatch_submissionTime",
    _.lastCompletedBatch.map(_.submissionTime).get, -1L)

get will throw exception when there's no completed batch. I'm not sure what's actual meaning, sorry if I misunderstand anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi TD, sorry to bother you again, I'm not if there's a better way to address this problem, would you mind giving me some hints, thanks a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the problem. Good catch, I did not realize that. How about this. Lets make two versions of registerGauge, one that takes f: StreamingProgressListener => T without any default value, another that takes f: StreamingProgressListener => Option[T] and the default value. Each version will be used accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it, I will change the code as you suggested.

}

private def registerGaugeWithOption[T](
name: String,
f: StreamingJobProgressListener => Option[T],
defaultValue: T): Unit = {
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
override def getValue: T = f(streamingListener).getOrElse(defaultValue)
})
}

Expand All @@ -41,6 +49,12 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
// Gauge for number of total completed batches
registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)

// Gauge for number of total received records
registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L)

// Gauge for number of total processed records
registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L)

// Gauge for number of unprocessed batches
registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)

Expand All @@ -55,19 +69,30 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {

// Gauge for last completed batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGauge("lastCompletedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
registerGauge("lastCompletedBatch_processStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
registerGauge("lastCompletedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
registerGaugeWithOption("lastCompletedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime), -1L)
registerGaugeWithOption("lastCompletedBatch_processingStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
registerGaugeWithOption("lastCompletedBatch_processingEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)

// Gauge for last completed batch's delay information.
registerGaugeWithOption("lastCompletedBatch_processingDelay",
_.lastCompletedBatch.flatMap(_.processingDelay), -1L)
registerGaugeWithOption("lastCompletedBatch_schedulingDelay",
_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L)
registerGaugeWithOption("lastCompletedBatch_totalDelay",
_.lastCompletedBatch.flatMap(_.totalDelay), -1L)

// Gauge for last received batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGauge("lastReceivedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
registerGauge("lastReceivedBatch_processStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
registerGauge("lastReceivedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
registerGaugeWithOption("lastReceivedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime), -1L)
registerGaugeWithOption("lastReceivedBatch_processingStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
registerGaugeWithOption("lastReceivedBatch_processingEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)

// Gauge for last received batch records.
registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
import org.apache.spark.Logging


private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
Expand All @@ -36,6 +35,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private val completedaBatchInfos = new Queue[BatchInfo]
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]

val batchDuration = ssc.graph.batchDuration.milliseconds
Expand Down Expand Up @@ -65,6 +66,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)

batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
totalReceivedRecords += infos.map(_.numRecords).sum
}
}

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
Expand All @@ -73,6 +78,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
totalCompletedBatches += 1L

batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
totalProcessedRecords += infos.map(_.numRecords).sum
}
}

def numReceivers = synchronized {
Expand All @@ -83,6 +92,14 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
totalCompletedBatches
}

def numTotalReceivedRecords: Long = synchronized {
totalReceivedRecords
}

def numTotalProcessedRecords: Long = synchronized {
totalProcessedRecords
}

def numUnprocessedBatches: Long = synchronized {
waitingBatchInfos.size + runningBatchInfos.size
}
Expand Down