From a3845c0e28b506bc6b5dea735dbc8584e8743ce7 Mon Sep 17 00:00:00 2001 From: smartloli Date: Sat, 19 Jan 2019 00:09:19 +0800 Subject: [PATCH] fixed create & delete topic --- .../eagle/common/constant/JmxConstants.java | 5 ++ .../eagle/core/factory/KafkaServiceImpl.java | 51 +++++++++---------- .../eagle/core/factory/ZkServiceImpl.java | 7 ++- 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/constant/JmxConstants.java b/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/constant/JmxConstants.java index aee57da3..402ff9e3 100644 --- a/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/constant/JmxConstants.java +++ b/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/constant/JmxConstants.java @@ -35,6 +35,11 @@ public interface KafkaNetWork { // TODO } + public interface KafkaServer8{ + public static final String version = "kafka.common:type=AppInfo,name=Version"; + public static final String value = "Value"; + } + public interface KafkaServer { class BrokerTopicMetrics { public static String bytesInPerSec = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec"; diff --git a/kafka-eagle-core/src/main/java/org/smartloli/kafka/eagle/core/factory/KafkaServiceImpl.java b/kafka-eagle-core/src/main/java/org/smartloli/kafka/eagle/core/factory/KafkaServiceImpl.java index 6605bc34..53bebc5a 100644 --- a/kafka-eagle-core/src/main/java/org/smartloli/kafka/eagle/core/factory/KafkaServiceImpl.java +++ b/kafka-eagle-core/src/main/java/org/smartloli/kafka/eagle/core/factory/KafkaServiceImpl.java @@ -63,9 +63,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartloli.kafka.eagle.common.constant.JmxConstants.KafkaServer; +import org.smartloli.kafka.eagle.common.constant.JmxConstants.KafkaServer8; import org.smartloli.kafka.eagle.common.protocol.*; import org.smartloli.kafka.eagle.common.util.CalendarUtils; import org.smartloli.kafka.eagle.common.util.SystemConfigUtils; +import org.smartloli.kafka.eagle.common.util.KConstants.CollectorType; import org.smartloli.kafka.eagle.common.util.KConstants.Kafka; import org.smartloli.kafka.eagle.common.util.KafkaPartitioner; import org.smartloli.kafka.eagle.common.util.KafkaZKPoolUtils; @@ -75,7 +77,6 @@ import com.alibaba.fastjson.JSONObject; import kafka.admin.RackAwareMode; -import kafka.admin.TopicCommand; import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; import scala.Option; @@ -98,6 +99,7 @@ public class KafkaServiceImpl implements KafkaService { private final String BROKER_IDS_PATH = "/brokers/ids"; private final String BROKER_TOPICS_PATH = "/brokers/topics"; + private final String DELETE_TOPICS_PATH = "/admin/delete_topics"; private final String CONSUMERS_PATH = "/consumers"; private final String TOPIC_ISR = "/brokers/topics/%s/partitions/%s/state"; private final Logger LOG = LoggerFactory.getLogger(KafkaServiceImpl.class); @@ -215,7 +217,7 @@ public String getAllBrokersInfo(String clusterAlias) { } broker.setJmxPort(JSON.parseObject(tupleString).getInteger("jmx_port")); broker.setId(++id); - broker.setVersion(getKafkaVersion(broker.getHost(), broker.getJmxPort(), ids)); + broker.setVersion(getKafkaVersion(broker.getHost(), broker.getJmxPort(), ids, clusterAlias)); targets.add(broker); } catch (Exception ex) { LOG.error(ex.getMessage()); @@ -483,21 +485,15 @@ public Map create(String clusterAlias, String topicName, String targets.put("info", "replication factor: " + replic + " larger than available brokers: " + brokers); return targets; } - String formatter = SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.offset.storage"); - String zks = SystemConfigUtils.getProperty(clusterAlias + ".zk.list"); - if ("kafka".equals(formatter)) { - KafkaZkClient zkc = kafkaZKPool.getZkClient(clusterAlias); - AdminZkClient adminZkCli = new AdminZkClient(zkc); - adminZkCli.createTopic(topicName, Integer.parseInt(partitions), Integer.parseInt(replic), new Properties(), RackAwareMode.Enforced$.MODULE$); - if (zkc != null) { - kafkaZKPool.release(clusterAlias, zkc); - zkc = null; - adminZkCli = null; - } - } else { - String[] options = new String[] { "--create", "--zookeeper", zks, "--partitions", partitions, "--topic", topicName, "--replication-factor", replic }; - TopicCommand.main(options); + KafkaZkClient zkc = kafkaZKPool.getZkClient(clusterAlias); + AdminZkClient adminZkCli = new AdminZkClient(zkc); + adminZkCli.createTopic(topicName, Integer.parseInt(partitions), Integer.parseInt(replic), new Properties(), RackAwareMode.Enforced$.MODULE$); + if (zkc != null) { + kafkaZKPool.release(clusterAlias, zkc); + zkc = null; + adminZkCli = null; } + targets.put("status", "success"); targets.put("info", "Create topic[" + topicName + "] has successed,partitions numbers is [" + partitions + "],replication-factor numbers is [" + replic + "]"); return targets; @@ -507,16 +503,15 @@ public Map create(String clusterAlias, String topicName, String public Map delete(String clusterAlias, String topicName) { Map targets = new HashMap(); KafkaZkClient zkc = kafkaZKPool.getZkClient(clusterAlias); - String formatter = SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.offset.storage"); - String zks = SystemConfigUtils.getProperty(clusterAlias + ".zk.list"); - if ("kafka".equals(formatter)) { - AdminZkClient adminZkCli = new AdminZkClient(zkc); - adminZkCli.deleteTopic(topicName); + AdminZkClient adminZkCli = new AdminZkClient(zkc); + adminZkCli.deleteTopic(topicName); + boolean dt = zkc.deleteRecursive(DELETE_TOPICS_PATH + "/" + topicName); + boolean bt = zkc.deleteRecursive(BROKER_TOPICS_PATH + "/" + topicName); + if (dt && bt) { + targets.put("status", "success"); } else { - String[] options = new String[] { "--delete", "--zookeeper", zks, "--topic", topicName }; - TopicCommand.main(options); + targets.put("status", "failed"); } - targets.put("status", zkc.deleteRecursive(BROKER_TOPICS_PATH + "/" + topicName) == true ? "success" : "failed"); if (zkc != null) { kafkaZKPool.release(clusterAlias, zkc); zkc = null; @@ -846,7 +841,7 @@ public long getKafkaLogSize(String clusterAlias, String topic, int partitionid) } /** Get kafka version. */ - private String getKafkaVersion(String host, int port, String ids) { + private String getKafkaVersion(String host, int port, String ids, String clusterAlias) { JMXConnector connector = null; String version = ""; String JMX = "service:jmx:rmi:///jndi/rmi://%s/jmxrmi"; @@ -854,7 +849,11 @@ private String getKafkaVersion(String host, int port, String ids) { JMXServiceURL jmxSeriverUrl = new JMXServiceURL(String.format(JMX, host + ":" + port)); connector = JMXConnectorFactory.connect(jmxSeriverUrl); MBeanServerConnection mbeanConnection = connector.getMBeanServerConnection(); - version = mbeanConnection.getAttribute(new ObjectName(String.format(KafkaServer.version, ids)), KafkaServer.value).toString(); + if (CollectorType.KAFKA.equals(SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.offset.storage"))) { + version = mbeanConnection.getAttribute(new ObjectName(String.format(KafkaServer.version, ids)), KafkaServer.value).toString(); + } else { + version = mbeanConnection.getAttribute(new ObjectName(KafkaServer8.version), KafkaServer8.value).toString(); + } } catch (Exception ex) { LOG.error("Get kafka version from jmx has error, msg is " + ex.getMessage()); } finally { diff --git a/kafka-eagle-core/src/main/java/org/smartloli/kafka/eagle/core/factory/ZkServiceImpl.java b/kafka-eagle-core/src/main/java/org/smartloli/kafka/eagle/core/factory/ZkServiceImpl.java index 88f9e7e9..dd1e5f36 100644 --- a/kafka-eagle-core/src/main/java/org/smartloli/kafka/eagle/core/factory/ZkServiceImpl.java +++ b/kafka-eagle-core/src/main/java/org/smartloli/kafka/eagle/core/factory/ZkServiceImpl.java @@ -41,6 +41,8 @@ import kafka.zk.KafkaZkClient; import scala.Option; import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; /** * Implements ZkService all method. @@ -62,7 +64,7 @@ public class ZkServiceImpl implements ZkService { /** Request memory space. */ private KafkaZkClient zkc = null; /** Instance Kafka Zookeeper client pool. */ - private KafkaZKPoolUtils kafkaZKPool = KafkaZKPoolUtils.getInstance(); + private KafkaZKPoolUtils kafkaZKPool = KafkaZKPoolUtils.getInstance(); /** Zookeeper delete command. */ public String delete(String clusterAlias, String cmd) { @@ -196,7 +198,8 @@ public String ls(String clusterAlias, String cmd) { KafkaZkClient zkc = kafkaZKPool.getZkClient(clusterAlias); boolean status = zkc.pathExists(cmd); if (status) { - target = zkc.getChildren(cmd).toString(); + Seq seq = zkc.getChildren(cmd); + target = JavaConversions.seqAsJavaList(seq).toString(); } if (zkc != null) { kafkaZKPool.release(clusterAlias, zkc);