From 813c8f1e5a3c9cfed79209e8ebcee249c892ba38 Mon Sep 17 00:00:00 2001 From: walter Date: Sat, 7 Oct 2023 10:25:55 +0800 Subject: [PATCH] [Improve](metric) Improve FE DorisMetricRegistry (#24773) The current implementation needs to iterate all metrics in a lock, which might cause latency spikes. This PR changes the underlying data structure to ConcurrentHashMap so that removing metrics doesn't need to block the entire registry. --- .../doris/metric/DorisMetricRegistry.java | 101 +++++++++++++----- .../doris/metric/JsonMetricVisitor.java | 34 +++--- .../org/apache/doris/metric/MetricRepo.java | 20 ++-- .../apache/doris/metric/MetricVisitor.java | 14 ++- .../doris/metric/PrometheusMetricVisitor.java | 12 +-- .../doris/metric/SimpleCoreMetricVisitor.java | 16 +-- .../org/apache/doris/metric/MetricsTest.java | 10 +- .../apache/doris/mtmv/MTMVJobManagerTest.java | 17 +-- 8 files changed, 121 insertions(+), 103 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java b/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java index 6a01e53dc9de6c..64e973f78342a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java @@ -21,60 +21,111 @@ import com.google.common.collect.Lists; -import java.util.Collection; -import java.util.Comparator; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; public class DorisMetricRegistry { - - private Collection metrics = Lists.newArrayList(); - private Collection systemMetrics = Lists.newArrayList(); + ConcurrentHashMap metrics = new ConcurrentHashMap<>(); + ConcurrentHashMap systemMetrics = new ConcurrentHashMap<>(); public DorisMetricRegistry() { } - public synchronized void addMetrics(Metric metric) { + public void addMetrics(Metric metric) { // No metric needs to be added to the Checkpoint thread. // And if you add a metric in Checkpoint thread, it will cause the metric to be added repeatedly, // and the Checkpoint Catalog may be saved incorrectly, resulting in FE memory leaks. if (!Env.isCheckpointThread()) { - metrics.add(metric); + String labelId = computeLabelId(metric.getLabels()); + metrics.computeIfAbsent(metric.getName(), (k) -> new MetricList()) + .addMetrics(labelId, metric); } } - public synchronized void addSystemMetrics(Metric sysMetric) { + public void addSystemMetrics(Metric sysMetric) { if (!Env.isCheckpointThread()) { - systemMetrics.add(sysMetric); + String labelId = computeLabelId(sysMetric.getLabels()); + systemMetrics.computeIfAbsent(sysMetric.getName(), (k) -> new MetricList()) + .addMetrics(labelId, sysMetric); } } - public synchronized int getAllMetricSize() { - return metrics.size() + systemMetrics.size(); - } - - public synchronized List getMetrics() { - return metrics.stream().sorted(Comparator.comparing(Metric::getName)).collect(Collectors.toList()); + public void accept(MetricVisitor visitor) { + final List metricsList = Lists.newArrayList(); + metrics.forEach((name, list) -> metricsList.add(list)); + final List sysMetricsList = Lists.newArrayList(); + systemMetrics.forEach((name, list) -> sysMetricsList.add(list)); + for (MetricList list : metricsList) { + for (Metric metric : list.getMetrics()) { + visitor.visit(MetricVisitor.FE_PREFIX, metric); + } + } + for (MetricList list : sysMetricsList) { + for (Metric metric : list.getMetrics()) { + visitor.visit(MetricVisitor.SYS_PREFIX, metric); + } + } } - public synchronized List getSystemMetrics() { - return systemMetrics.stream().sorted(Comparator.comparing(Metric::getName)).collect(Collectors.toList()); + // the metrics by metric name + public List getMetricsByName(String name) { + MetricList list = metrics.get(name); + if (list == null) { + list = systemMetrics.get(name); + } + if (list == null) { + return Lists.newArrayList(); + } + return list.getMetrics(); } - // the metrics by metric name - public synchronized List getMetricsByName(String name) { - List list = metrics.stream().filter(m -> m.getName().equals(name)).collect(Collectors.toList()); - if (list.isEmpty()) { - list = systemMetrics.stream().filter(m -> m.getName().equals(name)).collect(Collectors.toList()); + public void removeMetrics(String name) { + // Same reason as comment in addMetrics() + if (!Env.isCheckpointThread()) { + metrics.remove(name); } - return list; } - public synchronized void removeMetrics(String name) { + public void removeMetricsByNameAndLabels(String name, List labels) { // Same reason as comment in addMetrics() if (!Env.isCheckpointThread()) { - metrics = metrics.stream().filter(m -> !(m.getName().equals(name))).collect(Collectors.toList()); + MetricList metricList = metrics.get(name); + if (metricList != null) { + String labelId = computeLabelId(labels); + metricList.removeByLabelId(labelId); + } + } + } + + private static String computeLabelId(List labels) { + TreeMap labelMap = new TreeMap<>(); + for (MetricLabel label : labels) { + labelMap.put(label.getKey(), label.getValue().replace("\\", "\\\\").replace("\"", "\\\"")); + } + return labelMap.entrySet() + .stream() + .map(e -> String.format("%s=\"%s\"", e.getKey(), e.getValue())) + .collect(Collectors.joining(" ")); + } + + public static class MetricList { + private final HashMap metrics = new HashMap<>(); + + private synchronized void addMetrics(String labelId, Metric metric) { + metrics.put(labelId, metric); + } + + private synchronized List getMetrics() { + return new ArrayList<>(metrics.values()); + } + + private synchronized void removeByLabelId(String labelId) { + metrics.remove(labelId); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java index 00cb73d8ac740d..d32db92ff58be8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java @@ -25,26 +25,22 @@ public class JsonMetricVisitor extends MetricVisitor { private int ordinal = 0; - private int metricNumber = 0; + private boolean closed = false; public JsonMetricVisitor() { super(); + sb.append("[\n"); } @Override - public void setMetricNumber(int metricNumber) { - this.metricNumber = metricNumber; - } - - @Override - public void visitJvm(StringBuilder sb, JvmStats jvmStats) { + public void visitJvm(JvmStats jvmStats) { return; } @Override - public void visit(StringBuilder sb, String prefix, @SuppressWarnings("rawtypes") Metric metric) { - if (ordinal++ == 0) { - sb.append("[\n"); + public void visit(String prefix, @SuppressWarnings("rawtypes") Metric metric) { + if (ordinal++ != 0) { + sb.append(",\n"); } sb.append("{\n\t\"tags\":\n\t{\n"); sb.append("\t\t\"metric\":\"").append(prefix).append(metric.getName()).append("\""); @@ -66,20 +62,24 @@ public void visit(StringBuilder sb, String prefix, @SuppressWarnings("rawtypes") // value sb.append("\t\"value\":").append(metric.getValue().toString()).append("\n}"); - if (ordinal < metricNumber) { - sb.append(",\n"); - } else { - sb.append("\n]"); - } } @Override - public void visitHistogram(StringBuilder sb, String prefix, String name, Histogram histogram) { + public void visitHistogram(String prefix, String name, Histogram histogram) { return; } @Override - public void getNodeInfo(StringBuilder sb) { + public void getNodeInfo() { return; } + + @Override + public String finish() { + if (!closed) { + sb.append("\n]"); + closed = true; + } + return sb.toString(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index c9ceb2c3702879..165e2a60c0c134 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -647,32 +647,24 @@ public static synchronized String getMetric(MetricVisitor visitor) { // update the metrics first updateMetrics(); - StringBuilder sb = new StringBuilder(); // jvm JvmService jvmService = new JvmService(); JvmStats jvmStats = jvmService.stats(); - visitor.visitJvm(sb, jvmStats); + visitor.visitJvm(jvmStats); - visitor.setMetricNumber(DORIS_METRIC_REGISTER.getAllMetricSize()); - // doris metrics - for (Metric metric : DORIS_METRIC_REGISTER.getMetrics()) { - visitor.visit(sb, MetricVisitor.FE_PREFIX, metric); - } - // system metric - for (Metric metric : DORIS_METRIC_REGISTER.getSystemMetrics()) { - visitor.visit(sb, MetricVisitor.SYS_PREFIX, metric); - } + // doris metrics and system metrics. + DORIS_METRIC_REGISTER.accept(visitor); // histogram SortedMap histograms = METRIC_REGISTER.getHistograms(); for (Map.Entry entry : histograms.entrySet()) { - visitor.visitHistogram(sb, MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue()); + visitor.visitHistogram(MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue()); } // node info - visitor.getNodeInfo(sb); + visitor.getNodeInfo(); - return sb.toString(); + return visitor.finish(); } public static > AutoMappedMetric addLabeledMetrics(String label, Supplier metric) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricVisitor.java index 575afeeeceb37e..dcbf9268b8285f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricVisitor.java @@ -31,16 +31,20 @@ public abstract class MetricVisitor { // for system metrics public static final String SYS_PREFIX = "system_"; + protected StringBuilder sb = new StringBuilder(); + public MetricVisitor() { } - public abstract void setMetricNumber(int metricNumber); + public abstract void visitJvm(JvmStats jvmStats); - public abstract void visitJvm(StringBuilder sb, JvmStats jvmStats); + public abstract void visit(String prefix, Metric metric); - public abstract void visit(StringBuilder sb, String prefix, Metric metric); + public abstract void visitHistogram(String prefix, String name, Histogram histogram); - public abstract void visitHistogram(StringBuilder sb, String prefix, String name, Histogram histogram); + public abstract void getNodeInfo(); - public abstract void getNodeInfo(StringBuilder sb); + public String finish() { + return sb.toString(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java index ae1316d9821918..b0cf862041592a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java @@ -60,11 +60,7 @@ public PrometheusMetricVisitor() { } @Override - public void setMetricNumber(int metricNumber) { - } - - @Override - public void visitJvm(StringBuilder sb, JvmStats jvmStats) { + public void visitJvm(JvmStats jvmStats) { // heap sb.append(Joiner.on(" ").join(HELP, JVM_HEAP_SIZE_BYTES, "jvm heap stat\n")); sb.append(Joiner.on(" ").join(TYPE, JVM_HEAP_SIZE_BYTES, "gauge\n")); @@ -150,7 +146,7 @@ public void visitJvm(StringBuilder sb, JvmStats jvmStats) { } @Override - public void visit(StringBuilder sb, String prefix, @SuppressWarnings("rawtypes") Metric metric) { + public void visit(String prefix, @SuppressWarnings("rawtypes") Metric metric) { // title final String fullName = prefix + metric.getName(); if (!metricNames.contains(fullName)) { @@ -176,7 +172,7 @@ public void visit(StringBuilder sb, String prefix, @SuppressWarnings("rawtypes") } @Override - public void visitHistogram(StringBuilder sb, String prefix, String name, Histogram histogram) { + public void visitHistogram(String prefix, String name, Histogram histogram) { // part.part.part.k1=v1.k2=v2 List names = new ArrayList<>(); List tags = new ArrayList<>(); @@ -215,7 +211,7 @@ public void visitHistogram(StringBuilder sb, String prefix, String name, Histogr } @Override - public void getNodeInfo(StringBuilder sb) { + public void getNodeInfo() { final String NODE_INFO = "node_info"; sb.append(Joiner.on(" ").join(TYPE, NODE_INFO, "gauge\n")); sb.append(NODE_INFO).append("{type=\"fe_node_num\", state=\"total\"} ") diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java index 99ea967a3b1ccc..bbbd13186ee175 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java @@ -56,9 +56,6 @@ public class SimpleCoreMetricVisitor extends MetricVisitor { public static final String MAX_TABLET_COMPACTION_SCORE = "max_tablet_compaction_score"; - private int ordinal = 0; - private int metricNumber = 0; - private static final Map CORE_METRICS = Maps.newHashMap(); static { @@ -76,12 +73,7 @@ public SimpleCoreMetricVisitor() { } @Override - public void setMetricNumber(int metricNumber) { - this.metricNumber = metricNumber; - } - - @Override - public void visitJvm(StringBuilder sb, JvmStats jvmStats) { + public void visitJvm(JvmStats jvmStats) { Iterator memIter = jvmStats.getMem().iterator(); while (memIter.hasNext()) { MemoryPool memPool = memIter.next(); @@ -104,7 +96,7 @@ public void visitJvm(StringBuilder sb, JvmStats jvmStats) { } @Override - public void visit(StringBuilder sb, String prefix, Metric metric) { + public void visit(String prefix, Metric metric) { if (!CORE_METRICS.containsKey(metric.getName())) { return; } @@ -120,7 +112,7 @@ public void visit(StringBuilder sb, String prefix, Metric metric) { } @Override - public void visitHistogram(StringBuilder sb, String prefix, String name, Histogram histogram) { + public void visitHistogram(String prefix, String name, Histogram histogram) { if (!CORE_METRICS.containsKey(name)) { return; } @@ -134,7 +126,7 @@ public void visitHistogram(StringBuilder sb, String prefix, String name, Histogr } @Override - public void getNodeInfo(StringBuilder sb) { + public void getNodeInfo() { long feDeadNum = Env.getCurrentEnv().getFrontends(null).stream().filter(f -> !f.isAlive()).count(); long beDeadNum = Env.getCurrentSystemInfo().getIdToBackend().values().stream().filter(b -> !b.isAlive()) .count(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/metric/MetricsTest.java b/fe/fe-core/src/test/java/org/apache/doris/metric/MetricsTest.java index 41f417bec755de..162a57f3f36980 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/metric/MetricsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/metric/MetricsTest.java @@ -62,17 +62,13 @@ public void testUserQueryMetrics() { MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd("test_user").increase(1L); MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd("test_user").increase(1L); MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd("test_user").update(10L); - StringBuilder sb = new StringBuilder(); MetricVisitor visitor = new PrometheusMetricVisitor(); - List metrics = MetricRepo.DORIS_METRIC_REGISTER.getMetrics(); - for (Metric metric : metrics) { - visitor.visit(sb, MetricVisitor.FE_PREFIX, metric); - } + MetricRepo.DORIS_METRIC_REGISTER.accept(visitor); SortedMap histograms = MetricRepo.METRIC_REGISTER.getHistograms(); for (Map.Entry entry : histograms.entrySet()) { - visitor.visitHistogram(sb, MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue()); + visitor.visitHistogram(MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue()); } - String metricResult = sb.toString(); + String metricResult = visitor.finish(); Assert.assertTrue(metricResult.contains("# TYPE doris_fe_query_total counter")); Assert.assertTrue(metricResult.contains("doris_fe_query_total{user=\"test_user\"} 1")); Assert.assertTrue(metricResult.contains("# TYPE doris_fe_query_err counter")); diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java index 6993299f853c54..704551c1d24d28 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java @@ -19,7 +19,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; -import org.apache.doris.metric.Metric; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mtmv.MTMVUtils.JobState; import org.apache.doris.mtmv.metadata.ChangeMTMVJob; @@ -138,20 +137,8 @@ public void testOnceJob() throws DdlException, InterruptedException { @Test public void testMetrics() { - int jobMetricCount = 0; - int taskMetricCount = 0; - List metrics = MetricRepo.DORIS_METRIC_REGISTER.getMetrics(); - for (Metric metric : metrics) { - if (metric.getName().startsWith("mtmv")) { - if (metric.getName().equals("mtmv_job")) { - jobMetricCount++; - } else if (metric.getName().equals("mtmv_task")) { - taskMetricCount++; - } else { - Assertions.fail(); - } - } - } + int jobMetricCount = MetricRepo.DORIS_METRIC_REGISTER.getMetricsByName("mtmv_job").size(); + int taskMetricCount = MetricRepo.DORIS_METRIC_REGISTER.getMetricsByName("mtmv_task").size(); Assertions.assertEquals(2, jobMetricCount); Assertions.assertEquals(4, taskMetricCount); }