Skip to content

Commit

Permalink
web and third-party thread pool monitoring data is reported to es (#1576
Browse files Browse the repository at this point in the history
)

* test:Test BeanUtil tool

* feature: web and adapter thread pools monitor data reporting es
  • Loading branch information
theNorthWindBlow authored Oct 10, 2024
1 parent 0d3740e commit a5d1d6f
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ private static void handleElasticSearch(MonitorHandlerTypeEnum type, MonitorHand
context.monitors.add(new DynamicThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
break;
case WEB:
context.monitors.add(new WebThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
context.monitors.add(new WebThreadPoolElasticSearchMonitorHandler());
break;
case ADAPTER:
context.monitors.add(new AdapterThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
context.monitors.add(new AdapterThreadPoolElasticSearchMonitorHandler());
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public DynamicThreadPoolElasticSearchMonitorHandler dynamicThreadPoolElasticSear
@Bean
@ConditionalOnExpression("'${spring.dynamic.thread-pool.monitor.thread-pool-types:}'.contains('web')")
public WebThreadPoolElasticSearchMonitorHandler webThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) {
return new WebThreadPoolElasticSearchMonitorHandler(handler);
return new WebThreadPoolElasticSearchMonitorHandler();
}

@Bean
@ConditionalOnExpression("'${spring.dynamic.thread-pool.monitor.thread-pool-types:}'.contains('adapter')")
public AdapterThreadPoolElasticSearchMonitorHandler adapterThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) {
return new AdapterThreadPoolElasticSearchMonitorHandler(handler);
return new AdapterThreadPoolElasticSearchMonitorHandler();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,144 @@

package cn.hippo4j.monitor.elasticsearch;

import cn.hippo4j.common.model.ThreadPoolAdapterState;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.toolkit.FileUtil;
import cn.hippo4j.monitor.base.AbstractAdapterThreadPoolMonitor;
import cn.hippo4j.monitor.base.AbstractDynamicThreadPoolMonitor;
import cn.hippo4j.monitor.elasticsearch.model.ElasticSearchThreadPoolRunStateInfo;
import cn.hippo4j.threadpool.monitor.support.MonitorTypeEnum;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Adapter thread-pool elastic-search monitor handler.
*/
@Slf4j
public class AdapterThreadPoolElasticSearchMonitorHandler extends AbstractDynamicThreadPoolMonitor {
public class AdapterThreadPoolElasticSearchMonitorHandler extends AbstractAdapterThreadPoolMonitor {

public AdapterThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) {
super(handler);
private AtomicBoolean isIndexExist = null;
@Override
protected void execute(ThreadPoolAdapterState threadPoolAdapterState) {
ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo = BeanUtil.convert(threadPoolAdapterState, ElasticSearchThreadPoolRunStateInfo.class);
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String indexName = environment.getProperty("es.thread-pool-state.index.name", "adapter-thread-pool-state");
String applicationName = environment.getProperty("spring.application.name", "application");
if (!this.isExists(indexName)) {
List<String> rawMapping = FileUtil.readLines("mapping.json", StandardCharsets.UTF_8);
String mapping = String.join(" ", rawMapping);
// if index doesn't exsit, this function may try to create one, but recommend to create index manually.
this.createIndex(AdapterThreadPoolElasticSearchMonitorHandler.EsIndex.builder().index(indexName).type("_doc").mapping(mapping).build());
}
esThreadPoolRunStateInfo.setApplicationName(applicationName);
esThreadPoolRunStateInfo.setId(indexName + "-" + System.currentTimeMillis());
this.log2Es(esThreadPoolRunStateInfo, indexName);
}

@Override
protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) {
// TODO
public void log2Es(ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo, String indexName) {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
try {
IndexRequest request = new IndexRequest(indexName, "_doc");
request.id(esThreadPoolRunStateInfo.getId());
String stateJson = JSONUtil.toJSONString(esThreadPoolRunStateInfo);
request.source(stateJson, XContentType.JSON);
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
log.info("write thread-pool state to es, id is :{}", response.getId());
} catch (Exception ex) {
log.error("es index error, the exception was thrown in create index. name:{},type:{},id:{}. {} ",
indexName,
"_doc",
esThreadPoolRunStateInfo.getId(),
ex);
}
}

public synchronized boolean isExists(String index) {
if (Objects.isNull(isIndexExist)) {
boolean exists = false;
GetIndexRequest request = new GetIndexRequest(index);
try {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
exists = client.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("check es index fail");
}
isIndexExist = new AtomicBoolean(exists);
}
return isIndexExist.get();
}

public void createIndex(AdapterThreadPoolElasticSearchMonitorHandler.EsIndex esIndex) {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
boolean acknowledged = false;
CreateIndexRequest request = new CreateIndexRequest(esIndex.getIndex());
if (StringUtils.hasText(esIndex.getMapping())) {
request.mapping(esIndex.getType(), esIndex.getMapping(), XContentType.JSON);
}
if (!Objects.isNull(esIndex.getShards()) && !Objects.isNull(esIndex.getReplicas())) {
request.settings(Settings.builder()
.put("index.number_of_shards", esIndex.getShards())
.put("index.number_of_replicas", esIndex.getReplicas()));
}
if (StringUtils.hasText(esIndex.getAlias())) {
request.alias(new Alias(esIndex.getAlias()));
}
try {
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
acknowledged = createIndexResponse.isAcknowledged();
} catch (IOException e) {
log.error("create es index exception", e);
}
if (acknowledged) {
log.info("create es index success");
isIndexExist.set(true);
} else {
log.error("create es index fail");
throw new RuntimeException("cannot auto create thread-pool state es index");
}
}

@Override
public String getType() {
return MonitorTypeEnum.ELASTICSEARCH.name().toLowerCase();
}

/**
* Es Index
*/
@Getter
@Builder
private static class EsIndex {

String index;
String type;
String mapping;
Integer shards;
Integer replicas;
String alias;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,140 @@
package cn.hippo4j.monitor.elasticsearch;

import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.toolkit.FileUtil;
import cn.hippo4j.monitor.base.AbstractDynamicThreadPoolMonitor;
import cn.hippo4j.monitor.base.AbstractWebThreadPoolMonitor;
import cn.hippo4j.monitor.elasticsearch.model.ElasticSearchThreadPoolRunStateInfo;
import cn.hippo4j.threadpool.monitor.support.MonitorTypeEnum;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Web thread-pool elastic-search monitor handler.
*/
@Slf4j
public class WebThreadPoolElasticSearchMonitorHandler extends AbstractDynamicThreadPoolMonitor {

public WebThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) {
super(handler);
}
public class WebThreadPoolElasticSearchMonitorHandler extends AbstractWebThreadPoolMonitor {

private AtomicBoolean isIndexExist = null;
@Override
protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) {
// TODO
ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ElasticSearchThreadPoolRunStateInfo.class);
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String indexName = environment.getProperty("es.thread-pool-state.index.name", "web-thread-pool-state");
String applicationName = environment.getProperty("spring.application.name", "application");
if (!this.isExists(indexName)) {
List<String> rawMapping = FileUtil.readLines("mapping.json", StandardCharsets.UTF_8);
String mapping = String.join(" ", rawMapping);
// if index doesn't exsit, this function may try to create one, but recommend to create index manually.
this.createIndex(WebThreadPoolElasticSearchMonitorHandler.EsIndex.builder().index(indexName).type("_doc").mapping(mapping).build());
}
esThreadPoolRunStateInfo.setApplicationName(applicationName);
esThreadPoolRunStateInfo.setId(indexName + "-" + System.currentTimeMillis());
this.log2Es(esThreadPoolRunStateInfo, indexName);
}

public void log2Es(ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo, String indexName) {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
try {
IndexRequest request = new IndexRequest(indexName, "_doc");
request.id(esThreadPoolRunStateInfo.getId());
String stateJson = JSONUtil.toJSONString(esThreadPoolRunStateInfo);
request.source(stateJson, XContentType.JSON);
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
log.info("write thread-pool state to es, id is :{}", response.getId());
} catch (Exception ex) {
log.error("es index error, the exception was thrown in create index. name:{},type:{},id:{}. {} ",
indexName,
"_doc",
esThreadPoolRunStateInfo.getId(),
ex);
}
}

public synchronized boolean isExists(String index) {
if (Objects.isNull(isIndexExist)) {
boolean exists = false;
GetIndexRequest request = new GetIndexRequest(index);
try {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
exists = client.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("check es index fail");
}
isIndexExist = new AtomicBoolean(exists);
}
return isIndexExist.get();
}

public void createIndex(WebThreadPoolElasticSearchMonitorHandler.EsIndex esIndex) {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
boolean acknowledged = false;
CreateIndexRequest request = new CreateIndexRequest(esIndex.getIndex());
if (StringUtils.hasText(esIndex.getMapping())) {
request.mapping(esIndex.getType(), esIndex.getMapping(), XContentType.JSON);
}
if (!Objects.isNull(esIndex.getShards()) && !Objects.isNull(esIndex.getReplicas())) {
request.settings(Settings.builder()
.put("index.number_of_shards", esIndex.getShards())
.put("index.number_of_replicas", esIndex.getReplicas()));
}
if (StringUtils.hasText(esIndex.getAlias())) {
request.alias(new Alias(esIndex.getAlias()));
}
try {
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
acknowledged = createIndexResponse.isAcknowledged();
} catch (IOException e) {
log.error("create es index exception", e);
}
if (acknowledged) {
log.info("create es index success");
isIndexExist.set(true);
} else {
log.error("create es index fail");
throw new RuntimeException("cannot auto create thread-pool state es index");
}
}
@Override
public String getType() {
return MonitorTypeEnum.ELASTICSEARCH.name().toLowerCase();
}
}

/**
* Es Index
*/
@Getter
@Builder
private static class EsIndex {

String index;
String type;
String mapping;
Integer shards;
Integer replicas;
String alias;
}
}

0 comments on commit a5d1d6f

Please sign in to comment.