Skip to content

Commit

Permalink
refactor: simplify currentMeters ref
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Mar 10, 2020
1 parent a121051 commit 1c3776b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void checkAndBindMetrics(MeterRegistry registry) {
synchronized (this) { //Enforce only happens once when metrics change
if (!currentMeters.equals(metrics.keySet())) {
//Register meters
currentMeters = new HashSet<>();
currentMeters = new HashSet<>(metrics.keySet());
metrics.forEach((name, metric) -> {
//Filter out metrics from group "app-info", that includes metadata
if (METRIC_GROUP_APP_INFO.equals(name.group())) return;
Expand All @@ -124,15 +124,14 @@ void checkAndBindMetrics(MeterRegistry registry) {
for (Meter meter : meters) {
if (meter.getId().getTags().size() < (metricTags(metric).size() + extraTagsSize))
registry.remove(meter);
// if already exists
// Check if already exists
else if (meter.getId().getTags().equals(metricTags(metric))) return;
else hasLessTags = true;
}
if (hasLessTags) return;
//Filter out non-numeric values
if (!isNumber(metric)) return;
bindMeter(registry, metric);
currentMeters.add(name);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -57,23 +58,27 @@ class KafkaMetricsTest {

@Test void shouldAddNewMetersWhenMetricsChange() {
//Given
Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
AtomicReference<Map<MetricName, KafkaMetric>> metrics = new AtomicReference<>();
metrics.set(new LinkedHashMap<>());
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> metrics.updateAndGet(map -> {
MetricName metricName = new MetricName("a0", "b0", "c0", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
metrics.put(metricName, metric);
return metrics;
};
map.put(metricName, metric);
return map;
});
KafkaMetrics kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
//When
kafkaMetrics.checkAndBindMetrics(registry);
//Then
assertThat(registry.getMeters()).hasSize(1);
//Given
MetricName metricName = new MetricName("a1", "b1", "c1", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
metrics.put(metricName, metric);
metrics.updateAndGet(map -> {
MetricName metricName = new MetricName("a1", "b1", "c1", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
map.put(metricName, metric);
return map;
});
//When
kafkaMetrics.checkAndBindMetrics(registry);
//Then
Expand Down

0 comments on commit 1c3776b

Please sign in to comment.