@@ -62,6 +62,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
6262 registerGauge(" lastCompletedBatch_processEndTime" ,
6363 _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(- 1L ), - 1L )
6464
65+ // Gauge for last completed batch's delay information.
66+ registerGauge(" lastCompletedBatch_processTime" ,
67+ _.lastCompletedBatch.flatMap(_.processingDelay).getOrElse(0L ), - 1L )
68+ registerGauge(" lastCompletedBatch_schedulingDelay" ,
69+ _.lastCompletedBatch.flatMap(_.schedulingDelay).getOrElse(0L ), - 1L )
70+ registerGauge(" lastCompletedBatch_totalDelay" ,
71+ _.lastCompletedBatch.flatMap(_.totalDelay).getOrElse(0L ), - 1L )
72+
6573 // Gauge for last received batch, useful for monitoring the streaming job's running status,
6674 // displayed data -1 for any abnormal condition.
6775 registerGauge(" lastReceivedBatch_submissionTime" ,
@@ -70,4 +78,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
7078 _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(- 1L ), - 1L )
7179 registerGauge(" lastReceivedBatch_processEndTime" ,
7280 _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(- 1L ), - 1L )
81+
82+ // Gauge for last received batch records and total received batch records.
83+ private var totalReceivedBatchRecords : Long = 0L
84+ def getTotalReceivedBatchRecords (listener : StreamingJobProgressListener ): Long = {
85+ totalReceivedBatchRecords += listener.lastReceivedBatchRecords.values.sum
86+ totalReceivedBatchRecords
87+ }
88+
89+ registerGauge(" lastReceivedBatchRecords" , _.lastReceivedBatchRecords.values.sum, 0L )
90+ registerGauge(" totalReceivedBatchRecords" , getTotalReceivedBatchRecords, 0L )
7391}
0 commit comments