Skip to content

Commit

Permalink
Add support many missing versions
Browse files Browse the repository at this point in the history
2.2.1
2.2.2
2.3.0
2.3.1
2.4.0
2.4.1
2.5.0
2.5.1
2.6.0
  • Loading branch information
VHAISDMelhiT authored and VHAISDMelhiT committed Aug 21, 2020
1 parent eba0b95 commit 6854494
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 18 deletions.
25 changes: 25 additions & 0 deletions app/controllers/Logkafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,24 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
LogkafkaNewConfigs.configMaps(Kafka_2_1_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_2_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_2_1_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_2_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_2_2_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_2_2).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_3_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_3_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_3_1_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_3_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_4_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_4_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_4_1_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_4_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_5_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_5_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_5_1_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_5_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_6_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_6_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)

val defaultCreateForm = Form(
mapping(
Expand Down Expand Up @@ -159,7 +175,15 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
case Kafka_2_1_0 => (defaultCreateForm.fill(kafka_2_1_0_Default), clusterContext)
case Kafka_2_1_1 => (defaultCreateForm.fill(kafka_2_1_1_Default), clusterContext)
case Kafka_2_2_0 => (defaultCreateForm.fill(kafka_2_2_0_Default), clusterContext)
case Kafka_2_2_1 => (defaultCreateForm.fill(kafka_2_2_1_Default), clusterContext)
case Kafka_2_2_2 => (defaultCreateForm.fill(kafka_2_2_2_Default), clusterContext)
case Kafka_2_3_0 => (defaultCreateForm.fill(kafka_2_3_0_Default), clusterContext)
case Kafka_2_3_1 => (defaultCreateForm.fill(kafka_2_3_1_Default), clusterContext)
case Kafka_2_4_0 => (defaultCreateForm.fill(kafka_2_4_0_Default), clusterContext)
case Kafka_2_4_1 => (defaultCreateForm.fill(kafka_2_4_1_Default), clusterContext)
case Kafka_2_5_0 => (defaultCreateForm.fill(kafka_2_5_0_Default), clusterContext)
case Kafka_2_5_1 => (defaultCreateForm.fill(kafka_2_5_1_Default), clusterContext)
case Kafka_2_6_0 => (defaultCreateForm.fill(kafka_2_6_0_Default), clusterContext)
}
}
}
Expand Down Expand Up @@ -265,6 +289,7 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
case Kafka_2_1_1 => LogkafkaNewConfigs.configNames(Kafka_2_1_1).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_2_0 => LogkafkaNewConfigs.configNames(Kafka_2_2_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_4_0 => LogkafkaNewConfigs.configNames(Kafka_2_4_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_4_1 => LogkafkaNewConfigs.configNames(Kafka_2_4_1).map(n => (n,LKConfig(n,None))).toMap
}
val identityOption = li.identityMap.get(log_path)
if (identityOption.isDefined) {
Expand Down
24 changes: 24 additions & 0 deletions app/controllers/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,15 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
val kafka_2_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_1_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_1_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_2_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_2_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_2_2_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_2_2).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_3_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_3_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_3_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_3_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_4_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_4_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_4_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_4_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_5_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_5_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_5_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_5_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_6_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_6_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)

val defaultCreateForm = Form(
mapping(
Expand Down Expand Up @@ -170,7 +178,15 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
case Kafka_2_1_0 => (defaultCreateForm.fill(kafka_2_1_0_Default), clusterContext)
case Kafka_2_1_1 => (defaultCreateForm.fill(kafka_2_1_1_Default), clusterContext)
case Kafka_2_2_0 => (defaultCreateForm.fill(kafka_2_2_0_Default), clusterContext)
case Kafka_2_2_1 => (defaultCreateForm.fill(kafka_2_2_1_Default), clusterContext)
case Kafka_2_2_2 => (defaultCreateForm.fill(kafka_2_2_2_Default), clusterContext)
case Kafka_2_3_0 => (defaultCreateForm.fill(kafka_2_3_0_Default), clusterContext)
case Kafka_2_3_1 => (defaultCreateForm.fill(kafka_2_3_1_Default), clusterContext)
case Kafka_2_4_0 => (defaultCreateForm.fill(kafka_2_4_0_Default), clusterContext)
case Kafka_2_4_1 => (defaultCreateForm.fill(kafka_2_4_1_Default), clusterContext)
case Kafka_2_5_0 => (defaultCreateForm.fill(kafka_2_5_0_Default), clusterContext)
case Kafka_2_5_1 => (defaultCreateForm.fill(kafka_2_5_1_Default), clusterContext)
case Kafka_2_6_0 => (defaultCreateForm.fill(kafka_2_6_0_Default), clusterContext)
}
}
}
Expand Down Expand Up @@ -421,7 +437,15 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
case Kafka_2_1_0 => TopicConfigs.configNamesAndDoc(Kafka_2_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_1_1 => TopicConfigs.configNamesAndDoc(Kafka_2_1_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_2_0 => TopicConfigs.configNamesAndDoc(Kafka_2_2_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_2_1 => TopicConfigs.configNamesAndDoc(Kafka_2_2_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_2_2 => TopicConfigs.configNamesAndDoc(Kafka_2_2_2).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_3_0 => TopicConfigs.configNamesAndDoc(Kafka_2_3_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_3_1 => TopicConfigs.configNamesAndDoc(Kafka_2_3_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_4_0 => TopicConfigs.configNamesAndDoc(Kafka_2_4_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_4_1 => TopicConfigs.configNamesAndDoc(Kafka_2_4_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_5_0 => TopicConfigs.configNamesAndDoc(Kafka_2_5_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_5_1 => TopicConfigs.configNamesAndDoc(Kafka_2_5_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_6_0 => TopicConfigs.configNamesAndDoc(Kafka_2_6_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
}
val updatedConfigMap = ti.config.toMap
val updatedConfigList = defaultConfigs.map {
Expand Down
2 changes: 1 addition & 1 deletion app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath


object KafkaManagedOffsetCache {
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_4_0)
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0)
val ConsumerOffsetTopic = "__consumer_offsets"

def isSupported(version: KafkaVersion) : Boolean = {
Expand Down
42 changes: 41 additions & 1 deletion app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,42 @@ case object Kafka_2_2_0 extends KafkaVersion {
override def toString = "2.2.0"
}

case object Kafka_2_2_1 extends KafkaVersion {
override def toString = "2.2.1"
}

case object Kafka_2_2_2 extends KafkaVersion {
override def toString = "2.2.2"
}

case object Kafka_2_3_0 extends KafkaVersion {
override def toString = "2.3.0"
}

case object Kafka_2_3_1 extends KafkaVersion {
override def toString = "2.3.1"
}

case object Kafka_2_4_0 extends KafkaVersion {
override def toString = "2.4.0"
}

case object Kafka_2_4_1 extends KafkaVersion {
override def toString = "2.4.1"
}

case object Kafka_2_5_0 extends KafkaVersion {
override def toString = "2.5.0"
}

case object Kafka_2_5_1 extends KafkaVersion {
override def toString = "2.5.1"
}

case object Kafka_2_6_0 extends KafkaVersion {
override def toString = "2.6.0"
}

object KafkaVersion {
val supportedVersions: Map[String,KafkaVersion] = Map(
"0.8.1.1" -> Kafka_0_8_1_1,
Expand All @@ -129,7 +161,15 @@ object KafkaVersion {
"2.1.0" -> Kafka_2_1_0,
"2.1.1" -> Kafka_2_1_1,
"2.2.0" -> Kafka_2_2_0,
"2.4.0" -> Kafka_2_4_0
"2.2.1" -> Kafka_2_2_1,
"2.2.2" -> Kafka_2_2_2,
"2.3.0" -> Kafka_2_3_0,
"2.3.1" -> Kafka_2_3_1,
"2.4.0" -> Kafka_2_4_0,
"2.4.1" -> Kafka_2_4_1,
"2.5.0" -> Kafka_2_5_0,
"2.5.1" -> Kafka_2_5_1,
"2.6.0" -> Kafka_2_6_0
)

val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)).sortWith((a, b) => sortVersion(a._1, b._1))
Expand Down
10 changes: 9 additions & 1 deletion app/kafka/manager/utils/LogkafkaNewConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@ object LogkafkaNewConfigs {
Kafka_2_1_0 -> logkafka82.LogConfig,
Kafka_2_1_1 -> logkafka82.LogConfig,
Kafka_2_2_0 -> logkafka82.LogConfig,
Kafka_2_4_0 -> logkafka82.LogConfig
Kafka_2_2_1 -> logkafka82.LogConfig,
Kafka_2_2_2 -> logkafka82.LogConfig,
Kafka_2_3_0 -> logkafka82.LogConfig,
Kafka_2_3_1 -> logkafka82.LogConfig,
Kafka_2_4_0 -> two40.LogConfig,
Kafka_2_4_1 -> two40.LogConfig,
Kafka_2_5_0 -> two40.LogConfig,
Kafka_2_5_1 -> two40.LogConfig,
Kafka_2_6_0 -> two40.LogConfig
)

def configNames(version: KafkaVersion) : Set[String] = {
Expand Down
10 changes: 9 additions & 1 deletion app/kafka/manager/utils/TopicConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@ object TopicConfigs {
Kafka_2_1_0 -> two00.LogConfig,
Kafka_2_1_1 -> two00.LogConfig,
Kafka_2_2_0 -> two00.LogConfig,
Kafka_2_4_0 -> two00.LogConfig
Kafka_2_2_1 -> two00.LogConfig,
Kafka_2_2_2 -> two00.LogConfig,
Kafka_2_3_0 -> two00.LogConfig,
Kafka_2_3_1 -> two00.LogConfig,
Kafka_2_4_0 -> two40.LogConfig,
Kafka_2_4_1 -> two40.LogConfig,
Kafka_2_5_0 -> two40.LogConfig,
Kafka_2_5_1 -> two40.LogConfig,
Kafka_2_6_0 -> two40.LogConfig
)

def configNames(version: KafkaVersion): Seq[String] = {
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ libraryDependencies ++= Seq(
"org.slf4j" % "log4j-over-slf4j" % "1.7.25",
"com.adrianhurt" %% "play-bootstrap" % "1.4-P26-B4" exclude("com.typesafe.play", "*"),
"org.clapper" %% "grizzled-slf4j" % "1.3.3",
"org.apache.kafka" %% "kafka" % "2.4.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.apache.kafka" %% "kafka" % "2.6.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.apache.kafka" % "kafka-streams" % "2.2.0",
"com.beachape" %% "enumeratum" % "1.5.13",
"com.github.ben-manes.caffeine" % "caffeine" % "2.6.2",
Expand Down
2 changes: 1 addition & 1 deletion test/controller/api/TestKafkaStateCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M

private[this] def createCluster() = {
val future = kafkaManagerContext.get.getKafkaManager.addCluster(
testClusterName, "2.4.0", kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None
testClusterName, "2.6.0", kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None
)
val result = Await.result(future, duration)
result.toEither.left.foreach(apiError => sys.error(apiError.msg))
Expand Down
6 changes: 3 additions & 3 deletions test/kafka/manager/TestKafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}

test("add cluster") {
val future = kafkaManager.addCluster("dev","2.4.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
val future = kafkaManager.addCluster("dev","2.6.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
val result = Await.result(future,duration)
assert(result.isRight === true)
Thread.sleep(2000)
Expand Down Expand Up @@ -418,7 +418,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}

test("update cluster zkhost") {
val future = kafkaManager.updateCluster("dev","2.4.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val future = kafkaManager.updateCluster("dev","2.6.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val result = Await.result(future,duration)
assert(result.isRight === true)

Expand Down Expand Up @@ -475,7 +475,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}

test("update cluster logkafka enabled and activeOffsetCache enabled") {
val future = kafkaManager.updateCluster("dev","2.4.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val future = kafkaManager.updateCluster("dev","2.6.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val result = Await.result(future,duration)
assert(result.isRight === true)

Expand Down
Loading

0 comments on commit 6854494

Please sign in to comment.