Skip to content

Commit

Permalink
fixed email & im alert conditions smartloli#129
Browse files Browse the repository at this point in the history
  • Loading branch information
smartloli committed Jan 7, 2019
1 parent a602493 commit 9a64b40
Showing 1 changed file with 85 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,56 +161,53 @@ public void consumer(String clusterAlias) {
}

private void alert(String clusterAlias, List<OffsetsLiteInfo> offsetLites) {
boolean enableAlert = SystemConfigUtils.getBooleanProperty("kafka.eagle.mail.enable");
if (enableAlert) {
for (OffsetsLiteInfo offset : offsetLites) {
Map<String, Object> params = new HashMap<>();
params.put("cluster", clusterAlias);
params.put("group", offset.getGroup());
params.put("topic", offset.getTopic());
for (OffsetsLiteInfo offset : offsetLites) {
Map<String, Object> params = new HashMap<>();
params.put("cluster", clusterAlias);
params.put("group", offset.getGroup());
params.put("topic", offset.getTopic());

AlertServiceImpl alertService = StartupListener.getBean("alertServiceImpl", AlertServiceImpl.class);
AlertInfo alertInfo = alertService.findAlertByCGT(params);
AlertServiceImpl alertService = StartupListener.getBean("alertServiceImpl", AlertServiceImpl.class);
AlertInfo alertInfo = alertService.findAlertByCGT(params);

if (alertInfo != null && offset.getLag() > alertInfo.getLag()) {
// Mail
try {
MailProvider provider = new MailFactory();
String subject = "Kafka Eagle Consumer Alert";
String address = alertInfo.getOwner();
LagContentModule lcm = new LagContentModule();
lcm.setCluster(clusterAlias);
lcm.setConsumerLag(offset.getLag() + "");
lcm.setGroup(alertInfo.getGroup());
lcm.setLagThreshold(alertInfo.getLag() + "");
lcm.setTime(CalendarUtils.getDate());
lcm.setTopic(alertInfo.getTopic());
lcm.setType("Consumer");
lcm.setUser(alertInfo.getOwner());
provider.create().send(subject, address, lcm.toString(), "");
} catch (Exception ex) {
ex.printStackTrace();
LOG.error("Topic[" + alertInfo.getTopic() + "] Send alarm mail has error,msg is " + ex.getMessage());
}
if (alertInfo != null && offset.getLag() > alertInfo.getLag()) {
// Mail
try {
MailProvider provider = new MailFactory();
String subject = "Kafka Eagle Consumer Alert";
String address = alertInfo.getOwner();
LagContentModule lcm = new LagContentModule();
lcm.setCluster(clusterAlias);
lcm.setConsumerLag(offset.getLag() + "");
lcm.setGroup(alertInfo.getGroup());
lcm.setLagThreshold(alertInfo.getLag() + "");
lcm.setTime(CalendarUtils.getDate());
lcm.setTopic(alertInfo.getTopic());
lcm.setType("Consumer");
lcm.setUser(alertInfo.getOwner());
provider.create().send(subject, address, lcm.toString(), "");
} catch (Exception ex) {
ex.printStackTrace();
LOG.error("Topic[" + alertInfo.getTopic() + "] Send alarm mail has error,msg is " + ex.getMessage());
}

// IM (WeChat & DingDing)
try {
IMProvider provider = new IMFactory();
LagContentModule lcm = new LagContentModule();
lcm.setCluster(clusterAlias);
lcm.setConsumerLag(offset.getLag() + "");
lcm.setGroup(alertInfo.getGroup());
lcm.setLagThreshold(alertInfo.getLag() + "");
lcm.setTime(CalendarUtils.getDate());
lcm.setTopic(alertInfo.getTopic());
lcm.setType("Consumer");
lcm.setUser(alertInfo.getOwner());
provider.create().sendJsonMsgByWeChat(lcm.toWeChatMarkDown());
provider.create().sendJsonMsgByDingDing(lcm.toDingDingMarkDown());
} catch (Exception ex) {
ex.printStackTrace();
LOG.error("Topic[" + alertInfo.getTopic() + "] Send alarm wechat or dingding has error,msg is " + ex.getMessage());
}
// IM (WeChat & DingDing)
try {
IMProvider provider = new IMFactory();
LagContentModule lcm = new LagContentModule();
lcm.setCluster(clusterAlias);
lcm.setConsumerLag(offset.getLag() + "");
lcm.setGroup(alertInfo.getGroup());
lcm.setLagThreshold(alertInfo.getLag() + "");
lcm.setTime(CalendarUtils.getDate());
lcm.setTopic(alertInfo.getTopic());
lcm.setType("Consumer");
lcm.setUser(alertInfo.getOwner());
provider.create().sendJsonMsgByWeChat(lcm.toWeChatMarkDown());
provider.create().sendJsonMsgByDingDing(lcm.toDingDingMarkDown());
} catch (Exception ex) {
ex.printStackTrace();
LOG.error("Topic[" + alertInfo.getTopic() + "] Send alarm wechat or dingding has error,msg is " + ex.getMessage());
}
}
}
Expand Down Expand Up @@ -259,61 +256,53 @@ private String getStatsPerDate() {
class Cluster {

public void cluster() {
boolean enableMailAlert = SystemConfigUtils.getBooleanProperty("kafka.eagle.mail.enable");
boolean enableIMDingDingAlert = SystemConfigUtils.getBooleanProperty("kafka.eagle.im.dingding.enable");
boolean enableWeChatAlert = SystemConfigUtils.getBooleanProperty("kafka.eagle.im.wechat.enable");
if (enableMailAlert || enableIMDingDingAlert || enableWeChatAlert) {
AlertServiceImpl alertService = StartupListener.getBean("alertServiceImpl", AlertServiceImpl.class);
for (ClustersInfo cluster : alertService.historys()) {
String[] servers = cluster.getServer().split(",");
for (String server : servers) {
String host = server.split(":")[0];
int port = 0;
try {
port = Integer.parseInt(server.split(":")[1]);
boolean status = NetUtils.telnet(host, port);
if (!status) {
// Mail
try {
MailProvider provider = new MailFactory();
String subject = "Kafka Eagle Alert";
ClusterContentModule ccm = new ClusterContentModule();
ccm.setCluster(cluster.getCluster());
ccm.setServer(host + ":" + port);
ccm.setTime(CalendarUtils.getDate());
ccm.setType(cluster.getType());
ccm.setUser(cluster.getOwner());
provider.create().send(subject, cluster.getOwner(), ccm.toString(), "");
} catch (Exception ex) {
ex.printStackTrace();
LOG.error("Alertor[" + cluster.getOwner() + "] Send alarm mail has error,msg is " + ex.getMessage());
}

// IM (WeChat & DingDing)
try {
IMProvider provider = new IMFactory();
ClusterContentModule ccm = new ClusterContentModule();
ccm.setCluster(cluster.getCluster());
ccm.setServer(host + ":" + port);
ccm.setTime(CalendarUtils.getDate());
ccm.setType(cluster.getType());
ccm.setUser(cluster.getOwner());
provider.create().sendJsonMsgByDingDing(ccm.toDingDingMarkDown());
provider.create().sendJsonMsgByWeChat(ccm.toWeChatMarkDown());
} catch (Exception ex) {
ex.printStackTrace();
LOG.error("Send alarm wechat or dingding has error,msg is " + ex.getMessage());
}
AlertServiceImpl alertService = StartupListener.getBean("alertServiceImpl", AlertServiceImpl.class);
for (ClustersInfo cluster : alertService.historys()) {
String[] servers = cluster.getServer().split(",");
for (String server : servers) {
String host = server.split(":")[0];
int port = 0;
try {
port = Integer.parseInt(server.split(":")[1]);
boolean status = NetUtils.telnet(host, port);
if (!status) {
// Mail
try {
MailProvider provider = new MailFactory();
String subject = "Kafka Eagle Alert";
ClusterContentModule ccm = new ClusterContentModule();
ccm.setCluster(cluster.getCluster());
ccm.setServer(host + ":" + port);
ccm.setTime(CalendarUtils.getDate());
ccm.setType(cluster.getType());
ccm.setUser(cluster.getOwner());
provider.create().send(subject, cluster.getOwner(), ccm.toString(), "");
} catch (Exception ex) {
ex.printStackTrace();
LOG.error("Alertor[" + cluster.getOwner() + "] Send alarm mail has error,msg is " + ex.getMessage());
}

} catch (Exception e) {
LOG.error("Parse port[" + server.split(":")[1] + "] has error, msg is " + e.getMessage());
// IM (WeChat & DingDing)
try {
IMProvider provider = new IMFactory();
ClusterContentModule ccm = new ClusterContentModule();
ccm.setCluster(cluster.getCluster());
ccm.setServer(host + ":" + port);
ccm.setTime(CalendarUtils.getDate());
ccm.setType(cluster.getType());
ccm.setUser(cluster.getOwner());
provider.create().sendJsonMsgByDingDing(ccm.toDingDingMarkDown());
provider.create().sendJsonMsgByWeChat(ccm.toWeChatMarkDown());
} catch (Exception ex) {
ex.printStackTrace();
LOG.error("Send alarm wechat or dingding has error,msg is " + ex.getMessage());
}
}
} catch (Exception e) {
LOG.error("Parse port[" + server.split(":")[1] + "] has error, msg is " + e.getMessage());
}
}

}

}
}

Expand Down

0 comments on commit 9a64b40

Please sign in to comment.