Skip to content

Commit da059c6

Browse files
author
Lokesh Balakrishnan
committed
adding API to fetch consumers.
1 parent bb3573f commit da059c6

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

app/controllers/api/KafkaStateCheck.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,15 @@ class KafkaStateCheck (val messagesApi: MessagesApi, val kafkaManagerContext: Ka
118118
}
119119
Future.sequence(cosumdTopicSummary).map(_.toMap)
120120
}
121+
122+
def consumersSummaryAction(cluster: String) = Action.async { implicit request =>
123+
implicit val formats = org.json4s.DefaultFormats
124+
kafkaManager.getConsumerListExtended(cluster).map { errorOrConsumersSummary =>
125+
errorOrConsumersSummary.fold(
126+
error => BadRequest(Json.obj("msg" -> error.msg)),
127+
consumersSummary => Ok(Serialization.writePretty("consumers" -> consumersSummary.list.map{case ((consumer, consumerType), consumerIdentity) => (consumer, consumerType.toString())}.toMap))
128+
)
129+
}
130+
}
131+
121132
}

conf/routes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ GET /api/status/:c/:t/underReplicatedPartitions controllers.api.Kafk
6060
GET /api/status/:c/:t/unavailablePartitions controllers.api.KafkaStateCheck.unavailablePartitions(c:String,t:String)
6161
GET /api/status/:cluster/:consumer/:topic/topicSummary controllers.api.KafkaStateCheck.topicSummaryAction(cluster:String, consumer:String, topic:String, consumerType:String)
6262
GET /api/status/:cluster/:consumer/groupSummary controllers.api.KafkaStateCheck.groupSummaryAction(cluster:String, consumer:String, consumerType:String)
63+
GET /api/status/:cluster/consumersSummary controllers.api.KafkaStateCheck.consumersSummaryAction(cluster:String)
6364

6465
# Versioned Assets
6566
GET /vassets/*file controllers.Assets.versioned(path="/public", file: Asset)

0 commit comments

Comments
 (0)