Skip to content

Commit

Permalink
[Improve](metric) Improve FE DorisMetricRegistry (apache#24773)
Browse files Browse the repository at this point in the history
The current implementation needs to iterate all metrics in a lock,
which might cause latency spikes. This PR changes the underlying
data structure to ConcurrentHashMap so that removing metrics doesn't
need to block the entire registry.
  • Loading branch information
w41ter authored Oct 7, 2023
1 parent 42c5203 commit 813c8f1
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,60 +21,111 @@

import com.google.common.collect.Lists;

import java.util.Collection;
import java.util.Comparator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class DorisMetricRegistry {

private Collection<Metric> metrics = Lists.newArrayList();
private Collection<Metric> systemMetrics = Lists.newArrayList();
ConcurrentHashMap<String, MetricList> metrics = new ConcurrentHashMap<>();
ConcurrentHashMap<String, MetricList> systemMetrics = new ConcurrentHashMap<>();

public DorisMetricRegistry() {

}

public synchronized void addMetrics(Metric metric) {
public void addMetrics(Metric metric) {
// No metric needs to be added to the Checkpoint thread.
// And if you add a metric in Checkpoint thread, it will cause the metric to be added repeatedly,
// and the Checkpoint Catalog may be saved incorrectly, resulting in FE memory leaks.
if (!Env.isCheckpointThread()) {
metrics.add(metric);
String labelId = computeLabelId(metric.getLabels());
metrics.computeIfAbsent(metric.getName(), (k) -> new MetricList())
.addMetrics(labelId, metric);
}
}

public synchronized void addSystemMetrics(Metric sysMetric) {
public void addSystemMetrics(Metric sysMetric) {
if (!Env.isCheckpointThread()) {
systemMetrics.add(sysMetric);
String labelId = computeLabelId(sysMetric.getLabels());
systemMetrics.computeIfAbsent(sysMetric.getName(), (k) -> new MetricList())
.addMetrics(labelId, sysMetric);
}
}

public synchronized int getAllMetricSize() {
return metrics.size() + systemMetrics.size();
}

public synchronized List<Metric> getMetrics() {
return metrics.stream().sorted(Comparator.comparing(Metric::getName)).collect(Collectors.toList());
public void accept(MetricVisitor visitor) {
final List<MetricList> metricsList = Lists.newArrayList();
metrics.forEach((name, list) -> metricsList.add(list));
final List<MetricList> sysMetricsList = Lists.newArrayList();
systemMetrics.forEach((name, list) -> sysMetricsList.add(list));
for (MetricList list : metricsList) {
for (Metric metric : list.getMetrics()) {
visitor.visit(MetricVisitor.FE_PREFIX, metric);
}
}
for (MetricList list : sysMetricsList) {
for (Metric metric : list.getMetrics()) {
visitor.visit(MetricVisitor.SYS_PREFIX, metric);
}
}
}

public synchronized List<Metric> getSystemMetrics() {
return systemMetrics.stream().sorted(Comparator.comparing(Metric::getName)).collect(Collectors.toList());
// the metrics by metric name
public List<Metric> getMetricsByName(String name) {
MetricList list = metrics.get(name);
if (list == null) {
list = systemMetrics.get(name);
}
if (list == null) {
return Lists.newArrayList();
}
return list.getMetrics();
}

// the metrics by metric name
public synchronized List<Metric> getMetricsByName(String name) {
List<Metric> list = metrics.stream().filter(m -> m.getName().equals(name)).collect(Collectors.toList());
if (list.isEmpty()) {
list = systemMetrics.stream().filter(m -> m.getName().equals(name)).collect(Collectors.toList());
public void removeMetrics(String name) {
// Same reason as comment in addMetrics()
if (!Env.isCheckpointThread()) {
metrics.remove(name);
}
return list;
}

public synchronized void removeMetrics(String name) {
public void removeMetricsByNameAndLabels(String name, List<MetricLabel> labels) {
// Same reason as comment in addMetrics()
if (!Env.isCheckpointThread()) {
metrics = metrics.stream().filter(m -> !(m.getName().equals(name))).collect(Collectors.toList());
MetricList metricList = metrics.get(name);
if (metricList != null) {
String labelId = computeLabelId(labels);
metricList.removeByLabelId(labelId);
}
}
}

private static String computeLabelId(List<MetricLabel> labels) {
TreeMap<String, String> labelMap = new TreeMap<>();
for (MetricLabel label : labels) {
labelMap.put(label.getKey(), label.getValue().replace("\\", "\\\\").replace("\"", "\\\""));
}
return labelMap.entrySet()
.stream()
.map(e -> String.format("%s=\"%s\"", e.getKey(), e.getValue()))
.collect(Collectors.joining(" "));
}

public static class MetricList {
private final HashMap<String, Metric> metrics = new HashMap<>();

private synchronized void addMetrics(String labelId, Metric metric) {
metrics.put(labelId, metric);
}

private synchronized List<Metric> getMetrics() {
return new ArrayList<>(metrics.values());
}

private synchronized void removeByLabelId(String labelId) {
metrics.remove(labelId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,22 @@

public class JsonMetricVisitor extends MetricVisitor {
private int ordinal = 0;
private int metricNumber = 0;
private boolean closed = false;

public JsonMetricVisitor() {
super();
sb.append("[\n");
}

@Override
public void setMetricNumber(int metricNumber) {
this.metricNumber = metricNumber;
}

@Override
public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
public void visitJvm(JvmStats jvmStats) {
return;
}

@Override
public void visit(StringBuilder sb, String prefix, @SuppressWarnings("rawtypes") Metric metric) {
if (ordinal++ == 0) {
sb.append("[\n");
public void visit(String prefix, @SuppressWarnings("rawtypes") Metric metric) {
if (ordinal++ != 0) {
sb.append(",\n");
}
sb.append("{\n\t\"tags\":\n\t{\n");
sb.append("\t\t\"metric\":\"").append(prefix).append(metric.getName()).append("\"");
Expand All @@ -66,20 +62,24 @@ public void visit(StringBuilder sb, String prefix, @SuppressWarnings("rawtypes")

// value
sb.append("\t\"value\":").append(metric.getValue().toString()).append("\n}");
if (ordinal < metricNumber) {
sb.append(",\n");
} else {
sb.append("\n]");
}
}

@Override
public void visitHistogram(StringBuilder sb, String prefix, String name, Histogram histogram) {
public void visitHistogram(String prefix, String name, Histogram histogram) {
return;
}

@Override
public void getNodeInfo(StringBuilder sb) {
public void getNodeInfo() {
return;
}

@Override
public String finish() {
if (!closed) {
sb.append("\n]");
closed = true;
}
return sb.toString();
}
}
20 changes: 6 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -647,32 +647,24 @@ public static synchronized String getMetric(MetricVisitor visitor) {
// update the metrics first
updateMetrics();

StringBuilder sb = new StringBuilder();
// jvm
JvmService jvmService = new JvmService();
JvmStats jvmStats = jvmService.stats();
visitor.visitJvm(sb, jvmStats);
visitor.visitJvm(jvmStats);

visitor.setMetricNumber(DORIS_METRIC_REGISTER.getAllMetricSize());
// doris metrics
for (Metric metric : DORIS_METRIC_REGISTER.getMetrics()) {
visitor.visit(sb, MetricVisitor.FE_PREFIX, metric);
}
// system metric
for (Metric metric : DORIS_METRIC_REGISTER.getSystemMetrics()) {
visitor.visit(sb, MetricVisitor.SYS_PREFIX, metric);
}
// doris metrics and system metrics.
DORIS_METRIC_REGISTER.accept(visitor);

// histogram
SortedMap<String, Histogram> histograms = METRIC_REGISTER.getHistograms();
for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
visitor.visitHistogram(sb, MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue());
visitor.visitHistogram(MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue());
}

// node info
visitor.getNodeInfo(sb);
visitor.getNodeInfo();

return sb.toString();
return visitor.finish();
}

public static <M extends Metric<?>> AutoMappedMetric<M> addLabeledMetrics(String label, Supplier<M> metric) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ public abstract class MetricVisitor {
// for system metrics
public static final String SYS_PREFIX = "system_";

protected StringBuilder sb = new StringBuilder();

public MetricVisitor() {
}

public abstract void setMetricNumber(int metricNumber);
public abstract void visitJvm(JvmStats jvmStats);

public abstract void visitJvm(StringBuilder sb, JvmStats jvmStats);
public abstract void visit(String prefix, Metric metric);

public abstract void visit(StringBuilder sb, String prefix, Metric metric);
public abstract void visitHistogram(String prefix, String name, Histogram histogram);

public abstract void visitHistogram(StringBuilder sb, String prefix, String name, Histogram histogram);
public abstract void getNodeInfo();

public abstract void getNodeInfo(StringBuilder sb);
public String finish() {
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ public PrometheusMetricVisitor() {
}

@Override
public void setMetricNumber(int metricNumber) {
}

@Override
public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
public void visitJvm(JvmStats jvmStats) {
// heap
sb.append(Joiner.on(" ").join(HELP, JVM_HEAP_SIZE_BYTES, "jvm heap stat\n"));
sb.append(Joiner.on(" ").join(TYPE, JVM_HEAP_SIZE_BYTES, "gauge\n"));
Expand Down Expand Up @@ -150,7 +146,7 @@ public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
}

@Override
public void visit(StringBuilder sb, String prefix, @SuppressWarnings("rawtypes") Metric metric) {
public void visit(String prefix, @SuppressWarnings("rawtypes") Metric metric) {
// title
final String fullName = prefix + metric.getName();
if (!metricNames.contains(fullName)) {
Expand All @@ -176,7 +172,7 @@ public void visit(StringBuilder sb, String prefix, @SuppressWarnings("rawtypes")
}

@Override
public void visitHistogram(StringBuilder sb, String prefix, String name, Histogram histogram) {
public void visitHistogram(String prefix, String name, Histogram histogram) {
// part.part.part.k1=v1.k2=v2
List<String> names = new ArrayList<>();
List<String> tags = new ArrayList<>();
Expand Down Expand Up @@ -215,7 +211,7 @@ public void visitHistogram(StringBuilder sb, String prefix, String name, Histogr
}

@Override
public void getNodeInfo(StringBuilder sb) {
public void getNodeInfo() {
final String NODE_INFO = "node_info";
sb.append(Joiner.on(" ").join(TYPE, NODE_INFO, "gauge\n"));
sb.append(NODE_INFO).append("{type=\"fe_node_num\", state=\"total\"} ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ public class SimpleCoreMetricVisitor extends MetricVisitor {

public static final String MAX_TABLET_COMPACTION_SCORE = "max_tablet_compaction_score";

private int ordinal = 0;
private int metricNumber = 0;

private static final Map<String, String> CORE_METRICS = Maps.newHashMap();

static {
Expand All @@ -76,12 +73,7 @@ public SimpleCoreMetricVisitor() {
}

@Override
public void setMetricNumber(int metricNumber) {
this.metricNumber = metricNumber;
}

@Override
public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
public void visitJvm(JvmStats jvmStats) {
Iterator<MemoryPool> memIter = jvmStats.getMem().iterator();
while (memIter.hasNext()) {
MemoryPool memPool = memIter.next();
Expand All @@ -104,7 +96,7 @@ public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
}

@Override
public void visit(StringBuilder sb, String prefix, Metric metric) {
public void visit(String prefix, Metric metric) {
if (!CORE_METRICS.containsKey(metric.getName())) {
return;
}
Expand All @@ -120,7 +112,7 @@ public void visit(StringBuilder sb, String prefix, Metric metric) {
}

@Override
public void visitHistogram(StringBuilder sb, String prefix, String name, Histogram histogram) {
public void visitHistogram(String prefix, String name, Histogram histogram) {
if (!CORE_METRICS.containsKey(name)) {
return;
}
Expand All @@ -134,7 +126,7 @@ public void visitHistogram(StringBuilder sb, String prefix, String name, Histogr
}

@Override
public void getNodeInfo(StringBuilder sb) {
public void getNodeInfo() {
long feDeadNum = Env.getCurrentEnv().getFrontends(null).stream().filter(f -> !f.isAlive()).count();
long beDeadNum = Env.getCurrentSystemInfo().getIdToBackend().values().stream().filter(b -> !b.isAlive())
.count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,13 @@ public void testUserQueryMetrics() {
MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd("test_user").increase(1L);
MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd("test_user").increase(1L);
MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd("test_user").update(10L);
StringBuilder sb = new StringBuilder();
MetricVisitor visitor = new PrometheusMetricVisitor();
List<Metric> metrics = MetricRepo.DORIS_METRIC_REGISTER.getMetrics();
for (Metric metric : metrics) {
visitor.visit(sb, MetricVisitor.FE_PREFIX, metric);
}
MetricRepo.DORIS_METRIC_REGISTER.accept(visitor);
SortedMap<String, Histogram> histograms = MetricRepo.METRIC_REGISTER.getHistograms();
for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
visitor.visitHistogram(sb, MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue());
visitor.visitHistogram(MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue());
}
String metricResult = sb.toString();
String metricResult = visitor.finish();
Assert.assertTrue(metricResult.contains("# TYPE doris_fe_query_total counter"));
Assert.assertTrue(metricResult.contains("doris_fe_query_total{user=\"test_user\"} 1"));
Assert.assertTrue(metricResult.contains("# TYPE doris_fe_query_err counter"));
Expand Down
Loading

0 comments on commit 813c8f1

Please sign in to comment.