Skip to content

Commit

Permalink
fixed create & delete topic
Browse files Browse the repository at this point in the history
  • Loading branch information
smartloli committed Jan 18, 2019
1 parent cfdd8d8 commit a3845c0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public interface KafkaNetWork {
// TODO
}

public interface KafkaServer8{
public static final String version = "kafka.common:type=AppInfo,name=Version";
public static final String value = "Value";
}

public interface KafkaServer {
class BrokerTopicMetrics {
public static String bytesInPerSec = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartloli.kafka.eagle.common.constant.JmxConstants.KafkaServer;
import org.smartloli.kafka.eagle.common.constant.JmxConstants.KafkaServer8;
import org.smartloli.kafka.eagle.common.protocol.*;
import org.smartloli.kafka.eagle.common.util.CalendarUtils;
import org.smartloli.kafka.eagle.common.util.SystemConfigUtils;
import org.smartloli.kafka.eagle.common.util.KConstants.CollectorType;
import org.smartloli.kafka.eagle.common.util.KConstants.Kafka;
import org.smartloli.kafka.eagle.common.util.KafkaPartitioner;
import org.smartloli.kafka.eagle.common.util.KafkaZKPoolUtils;
Expand All @@ -75,7 +77,6 @@
import com.alibaba.fastjson.JSONObject;

import kafka.admin.RackAwareMode;
import kafka.admin.TopicCommand;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import scala.Option;
Expand All @@ -98,6 +99,7 @@ public class KafkaServiceImpl implements KafkaService {

private final String BROKER_IDS_PATH = "/brokers/ids";
private final String BROKER_TOPICS_PATH = "/brokers/topics";
private final String DELETE_TOPICS_PATH = "/admin/delete_topics";
private final String CONSUMERS_PATH = "/consumers";
private final String TOPIC_ISR = "/brokers/topics/%s/partitions/%s/state";
private final Logger LOG = LoggerFactory.getLogger(KafkaServiceImpl.class);
Expand Down Expand Up @@ -215,7 +217,7 @@ public String getAllBrokersInfo(String clusterAlias) {
}
broker.setJmxPort(JSON.parseObject(tupleString).getInteger("jmx_port"));
broker.setId(++id);
broker.setVersion(getKafkaVersion(broker.getHost(), broker.getJmxPort(), ids));
broker.setVersion(getKafkaVersion(broker.getHost(), broker.getJmxPort(), ids, clusterAlias));
targets.add(broker);
} catch (Exception ex) {
LOG.error(ex.getMessage());
Expand Down Expand Up @@ -483,21 +485,15 @@ public Map<String, Object> create(String clusterAlias, String topicName, String
targets.put("info", "replication factor: " + replic + " larger than available brokers: " + brokers);
return targets;
}
String formatter = SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.offset.storage");
String zks = SystemConfigUtils.getProperty(clusterAlias + ".zk.list");
if ("kafka".equals(formatter)) {
KafkaZkClient zkc = kafkaZKPool.getZkClient(clusterAlias);
AdminZkClient adminZkCli = new AdminZkClient(zkc);
adminZkCli.createTopic(topicName, Integer.parseInt(partitions), Integer.parseInt(replic), new Properties(), RackAwareMode.Enforced$.MODULE$);
if (zkc != null) {
kafkaZKPool.release(clusterAlias, zkc);
zkc = null;
adminZkCli = null;
}
} else {
String[] options = new String[] { "--create", "--zookeeper", zks, "--partitions", partitions, "--topic", topicName, "--replication-factor", replic };
TopicCommand.main(options);
KafkaZkClient zkc = kafkaZKPool.getZkClient(clusterAlias);
AdminZkClient adminZkCli = new AdminZkClient(zkc);
adminZkCli.createTopic(topicName, Integer.parseInt(partitions), Integer.parseInt(replic), new Properties(), RackAwareMode.Enforced$.MODULE$);
if (zkc != null) {
kafkaZKPool.release(clusterAlias, zkc);
zkc = null;
adminZkCli = null;
}

targets.put("status", "success");
targets.put("info", "Create topic[" + topicName + "] has successed,partitions numbers is [" + partitions + "],replication-factor numbers is [" + replic + "]");
return targets;
Expand All @@ -507,16 +503,15 @@ public Map<String, Object> create(String clusterAlias, String topicName, String
public Map<String, Object> delete(String clusterAlias, String topicName) {
Map<String, Object> targets = new HashMap<String, Object>();
KafkaZkClient zkc = kafkaZKPool.getZkClient(clusterAlias);
String formatter = SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.offset.storage");
String zks = SystemConfigUtils.getProperty(clusterAlias + ".zk.list");
if ("kafka".equals(formatter)) {
AdminZkClient adminZkCli = new AdminZkClient(zkc);
adminZkCli.deleteTopic(topicName);
AdminZkClient adminZkCli = new AdminZkClient(zkc);
adminZkCli.deleteTopic(topicName);
boolean dt = zkc.deleteRecursive(DELETE_TOPICS_PATH + "/" + topicName);
boolean bt = zkc.deleteRecursive(BROKER_TOPICS_PATH + "/" + topicName);
if (dt && bt) {
targets.put("status", "success");
} else {
String[] options = new String[] { "--delete", "--zookeeper", zks, "--topic", topicName };
TopicCommand.main(options);
targets.put("status", "failed");
}
targets.put("status", zkc.deleteRecursive(BROKER_TOPICS_PATH + "/" + topicName) == true ? "success" : "failed");
if (zkc != null) {
kafkaZKPool.release(clusterAlias, zkc);
zkc = null;
Expand Down Expand Up @@ -846,15 +841,19 @@ public long getKafkaLogSize(String clusterAlias, String topic, int partitionid)
}

/** Get kafka version. */
private String getKafkaVersion(String host, int port, String ids) {
private String getKafkaVersion(String host, int port, String ids, String clusterAlias) {
JMXConnector connector = null;
String version = "";
String JMX = "service:jmx:rmi:///jndi/rmi://%s/jmxrmi";
try {
JMXServiceURL jmxSeriverUrl = new JMXServiceURL(String.format(JMX, host + ":" + port));
connector = JMXConnectorFactory.connect(jmxSeriverUrl);
MBeanServerConnection mbeanConnection = connector.getMBeanServerConnection();
version = mbeanConnection.getAttribute(new ObjectName(String.format(KafkaServer.version, ids)), KafkaServer.value).toString();
if (CollectorType.KAFKA.equals(SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.offset.storage"))) {
version = mbeanConnection.getAttribute(new ObjectName(String.format(KafkaServer.version, ids)), KafkaServer.value).toString();
} else {
version = mbeanConnection.getAttribute(new ObjectName(KafkaServer8.version), KafkaServer8.value).toString();
}
} catch (Exception ex) {
LOG.error("Get kafka version from jmx has error, msg is " + ex.getMessage());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import kafka.zk.KafkaZkClient;
import scala.Option;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;

/**
* Implements ZkService all method.
Expand All @@ -62,7 +64,7 @@ public class ZkServiceImpl implements ZkService {
/** Request memory space. */
private KafkaZkClient zkc = null;
/** Instance Kafka Zookeeper client pool. */
private KafkaZKPoolUtils kafkaZKPool = KafkaZKPoolUtils.getInstance();
private KafkaZKPoolUtils kafkaZKPool = KafkaZKPoolUtils.getInstance();

/** Zookeeper delete command. */
public String delete(String clusterAlias, String cmd) {
Expand Down Expand Up @@ -196,7 +198,8 @@ public String ls(String clusterAlias, String cmd) {
KafkaZkClient zkc = kafkaZKPool.getZkClient(clusterAlias);
boolean status = zkc.pathExists(cmd);
if (status) {
target = zkc.getChildren(cmd).toString();
Seq<String> seq = zkc.getChildren(cmd);
target = JavaConversions.seqAsJavaList(seq).toString();
}
if (zkc != null) {
kafkaZKPool.release(clusterAlias, zkc);
Expand Down

0 comments on commit a3845c0

Please sign in to comment.