Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kafka same tags #1896

Merged
merged 7 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.micrometer.core.lang.NonNullApi;
import io.micrometer.core.lang.NonNullFields;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -57,7 +56,6 @@ class KafkaMetrics implements MeterBinder {

private final Supplier<Map<MetricName, ? extends Metric>> metricsSupplier;
private final Iterable<Tag> extraTags;
private final int extraTagsSize;

/**
* Keeps track of current set of metrics.
Expand All @@ -73,25 +71,21 @@ class KafkaMetrics implements MeterBinder {
KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags) {
this.metricsSupplier = metricsSupplier;
this.extraTags = extraTags;
int i = 0;
for (Tag ignored : extraTags) i++;
this.extraTagsSize = i + 1; // 1 = kafka version tag
}

@Override public void bindTo(MeterRegistry registry) {
Map<MetricName, ? extends Metric> metrics = metricsSupplier.get();
// Collect static metrics and tags
Metric startTimeMetric = null;
Metric startTime = null;
for (Map.Entry<MetricName, ? extends Metric> entry: metrics.entrySet()) {
MetricName name = entry.getKey();
if (METRIC_GROUP_APP_INFO.equals(name.group())) {
if (METRIC_GROUP_APP_INFO.equals(name.group()))
if (VERSION_METRIC_NAME.equals(name.name()))
kafkaVersion = (String) entry.getValue().metricValue();
else if (START_TIME_METRIC_NAME.equals(name.name()))
startTimeMetric = entry.getValue();
}
startTime = entry.getValue();
}
if (startTimeMetric != null) bindMeter(registry, startTimeMetric);
if (startTime != null) bindMeter(registry, startTime, meterName(startTime), meterTags(startTime));
// Collect dynamic metrics
checkAndBindMetrics(registry);
}
Expand All @@ -113,48 +107,47 @@ void checkAndBindMetrics(MeterRegistry registry) {
//Filter out metrics from groups that includes metadata
if (METRIC_GROUP_APP_INFO.equals(name.group())) return;
if (METRIC_GROUP_METRICS_COUNT.equals(name.group())) return;
String meterName = meterName(metric);
List<Tag> meterTags = meterTags(metric);
//Kafka has metrics with lower number of tags (e.g. with/without topic or partition tag)
//Remove meters with lower number of tags
boolean hasLessTags = false;
Collection<Meter> meters = registry.find(metricName(metric)).meters();
for (Meter meter : meters) {
if (meter.getId().getTags().size() < (metricTags(metric).size() + extraTagsSize))
registry.remove(meter);
for (Meter other : registry.find(meterName).meters()) {
List<Tag> tags = other.getId().getTags();
if (tags.size() < meterTags.size()) registry.remove(other);
// Check if already exists
else if (meter.getId().getTags().equals(metricTags(metric))) return;
else if (tags.size() == meterTags.size())
if (tags.equals(meterTags)) return;
else break;
else hasLessTags = true;
}
if (hasLessTags) return;
//Filter out non-numeric values
if (!(metric.metricValue() instanceof Number)) return;
bindMeter(registry, metric);
bindMeter(registry, metric, meterName, meterTags);
});
}
}
}
}

private void bindMeter(MeterRegistry registry, Metric metric) {
String name = metricName(metric);
if (name.endsWith("total") || name.endsWith("count"))
registerCounter(registry, metric, name, extraTags);
private void bindMeter(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
if (name.endsWith("total") || name.endsWith("count")) registerCounter(registry, metric, name, tags);
else if (name.endsWith("min") || name.endsWith("max") || name.endsWith("avg") || name.endsWith("rate"))
registerGauge(registry, metric, name, extraTags);
else registerGauge(registry, metric, name, extraTags);
registerGauge(registry, metric, name, tags);
else registerGauge(registry, metric, name, tags);
}

private void registerGauge(MeterRegistry registry, Metric metric, String metricName, Iterable<Tag> extraTags) {
Gauge.builder(metricName, metric, toMetricValue(registry))
.tags(metricTags(metric))
.tags(extraTags)
private void registerGauge(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
Gauge.builder(name, metric, toMetricValue(registry))
.tags(tags)
.description(metric.metricName().description())
.register(registry);
}

private void registerCounter(MeterRegistry registry, Metric metric, String metricName, Iterable<Tag> extraTags) {
FunctionCounter.builder(metricName, metric, toMetricValue(registry))
.tags(metricTags(metric))
.tags(extraTags)
private void registerCounter(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
FunctionCounter.builder(name, metric, toMetricValue(registry))
.tags(tags)
.description(metric.metricName().description())
.register(registry);
}
Expand All @@ -167,14 +160,15 @@ private ToDoubleFunction<Metric> toMetricValue(MeterRegistry registry) {
};
}

private List<Tag> metricTags(Metric metric) {
private List<Tag> meterTags(Metric metric) {
List<Tag> tags = new ArrayList<>();
tags.add(Tag.of("kafka-version", kafkaVersion));
metric.metricName().tags().forEach((key, value) -> tags.add(Tag.of(key, value)));
tags.add(Tag.of("kafka-version", kafkaVersion));
extraTags.forEach(tags::add);
return tags;
}

private String metricName(Metric metric) {
private String meterName(Metric metric) {
String name = METRIC_NAME_PREFIX + metric.metricName().group() + "." + metric.metricName().name();
return name.replaceAll("-metrics", "").replaceAll("-", ".");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,31 @@ class KafkaMetricsTest {
Meter meter = registry.getMeters().get(0);
assertThat(meter.getId().getTags()).hasSize(2); // version + key0
}

@Test void shouldBindMetersWithSameTags() {
//Given
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
Map<String, String> firstTags = new LinkedHashMap<>();
firstTags.put("key0", "value0");
MetricName firstName = new MetricName("a", "b", "c", firstTags);
KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
Map<String, String> secondTags = new LinkedHashMap<>();
secondTags.put("key0", "value1");
MetricName secondName = new MetricName("a", "b", "c", secondTags);
KafkaMetric secondMetric = new KafkaMetric(this, secondName, new Value(), new MetricConfig(), Time.SYSTEM);

Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
metrics.put(firstName, firstMetric);
metrics.put(secondName, secondMetric);
return metrics;
};
KafkaMetrics kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
//When
kafkaMetrics.checkAndBindMetrics(registry);
//Then
assertThat(registry.getMeters()).hasSize(2);
Meter meter = registry.getMeters().get(0);
assertThat(meter.getId().getTags()).hasSize(2); // version + key0
}
}