From abc0b30e4712d4a9d37cbafa7a41639d2af5c1d1 Mon Sep 17 00:00:00 2001 From: Jorge Quilcate Otoya Date: Tue, 10 Mar 2020 15:43:57 +0100 Subject: [PATCH] Kafka meter binder without JMX (#1835) Provides a replacement for the `KafkaConsumerMetrics` binder (uses JMX) - `KafkaClientMetrics` and `KafkaStreamsMetrics` binders based off metrics provided by the Kafka client API. This allows all metrics provided by the client to be registered as Micrometer metrics, with the caveat that metrics with the same name but less tags will not be registered. For some metrics, Kafka provides a total metric and one broken down by things like topic and partition. Micrometer will opt to register the most specific ones. Resolves #1095 Resolves #1096 --- dependencies.gradle | 7 +- micrometer-core/build.gradle | 8 + .../binder/kafka/KafkaClientMetrics.java | 102 +++++++++ .../binder/kafka/KafkaConsumerMetrics.java | 1 + .../instrument/binder/kafka/KafkaMetrics.java | 198 ++++++++++++++++++ .../binder/kafka/KafkaStreamsMetrics.java | 60 ++++++ .../binder/db/MetricsDSLContextTest.java | 2 + .../binder/kafka/KafkaAdminMetricsTest.java | 65 ++++++ .../binder/kafka/KafkaClientMetricsIT.java | 116 ++++++++++ .../kafka/KafkaConsumerMetricsTest.java | 144 +++---------- .../binder/kafka/KafkaMetricsTest.java | 164 +++++++++++++++ .../kafka/KafkaProducerMetricsTest.java | 71 +++++++ .../binder/kafka/KafkaStreamsMetricsTest.java | 70 +++++++ .../kafka/OldKafkaConsumerMetricsTest.java | 158 ++++++++++++++ 14 files changed, 1050 insertions(+), 116 deletions(-) create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetrics.java create mode 100644 micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaAdminMetricsTest.java create mode 100644 micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsIT.java create mode 100644 micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java create mode 100644 micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaProducerMetricsTest.java create mode 100644 micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetricsTest.java create mode 100644 micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/OldKafkaConsumerMetricsTest.java diff --git a/dependencies.gradle b/dependencies.gradle index 36f15a5a1c..9287fc1111 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -39,8 +39,8 @@ def VERSIONS = [ 'net.sf.ehcache:ehcache:latest.release', 'org.apache.httpcomponents:httpasyncclient:latest.release', 'org.apache.httpcomponents:httpclient:latest.release', - // Pin version temporarily to restore builds. See gh-1756 - 'org.apache.kafka:kafka-clients:2.3.1', + 'org.apache.kafka:kafka-clients:latest.release', + 'org.apache.kafka:kafka-streams:latest.release', 'org.apache.logging.log4j:log4j-core:2.+', 'org.apache.tomcat.embed:tomcat-embed-core:8.+', 'org.aspectj:aspectjweaver:1.8.+', @@ -69,6 +69,9 @@ def VERSIONS = [ 'org.springframework:spring-test:4.+', 'ru.lanwen.wiremock:wiremock-junit5:latest.release', 'software.amazon.awssdk:cloudwatch:latest.release', + 'org.testcontainers:testcontainers:latest.release', + 'org.testcontainers:junit-jupiter:latest.release', + 'org.testcontainers:kafka:latest.release' ] subprojects { diff --git a/micrometer-core/build.gradle b/micrometer-core/build.gradle index 913974b614..0464e0377b 100644 --- a/micrometer-core/build.gradle +++ b/micrometer-core/build.gradle @@ -48,6 +48,9 @@ dependencies { optionalApi 'org.jooq:jooq' + optionalApi 'org.apache.kafka:kafka-clients' + optionalApi 'org.apache.kafka:kafka-streams' + testImplementation 'io.projectreactor:reactor-test' // JUnit 5 @@ -86,4 +89,9 @@ dependencies { testImplementation 'com.github.tomakehurst:wiremock-jre8' testImplementation 'de.flapdoodle.embed:de.flapdoodle.embed.mongo' + + // Kafka binder IT dependencies + testImplementation 'org.testcontainers:testcontainers' + testImplementation 'org.testcontainers:junit-jupiter' + testImplementation 'org.testcontainers:kafka' } diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java new file mode 100644 index 0000000000..5179fcc27f --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java @@ -0,0 +1,102 @@ +/** + * Copyright 2020 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use super file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.kafka; + +import io.micrometer.core.annotation.Incubating; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.binder.MeterBinder; +import io.micrometer.core.lang.NonNullApi; +import io.micrometer.core.lang.NonNullFields; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.Metric; + +/** + * Kafka Client metrics binder. + *

+ * It is based on {@code metrics()} method returning {@link Metric} map exposed by clients and + * streams interface. + *

+ * Meter names have the following convention: {@code kafka.(metric_group).(metric_name)} + * + * @author Jorge Quilcate + * @see Kakfa monitoring + * documentation + * @since 1.4.0 + */ +@Incubating(since = "1.4.0") +@NonNullApi +@NonNullFields +public class KafkaClientMetrics extends KafkaMetrics implements MeterBinder { + + /** + * Kafka {@link Producer} metrics binder + * + * @param kafkaProducer producer instance to be instrumented + * @param tags additional tags + */ + public KafkaClientMetrics(Producer kafkaProducer, Iterable tags) { + super(kafkaProducer::metrics, tags); + } + + /** + * Kafka {@link Producer} metrics binder + * + * @param kafkaProducer producer instance to be instrumented + */ + public KafkaClientMetrics(Producer kafkaProducer) { + super(kafkaProducer::metrics); + } + + /** + * Kafka {@link Consumer} metrics binder + * + * @param kafkaConsumer consumer instance to be instrumented + * @param tags additional tags + */ + public KafkaClientMetrics(Consumer kafkaConsumer, Iterable tags) { + super(kafkaConsumer::metrics, tags); + } + + /** + * Kafka {@link Consumer} metrics binder + * + * @param kafkaConsumer consumer instance to be instrumented + */ + public KafkaClientMetrics(Consumer kafkaConsumer) { + super(kafkaConsumer::metrics); + } + + /** + * Kafka {@link AdminClient} metrics binder + * + * @param adminClient instance to be instrumented + * @param tags additional tags + */ + public KafkaClientMetrics(AdminClient adminClient, Iterable tags) { + super(adminClient::metrics, tags); + } + + /** + * Kafka {@link AdminClient} metrics binder + * + * @param adminClient instance to be instrumented + */ + public KafkaClientMetrics(AdminClient adminClient) { + super(adminClient::metrics); + } +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetrics.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetrics.java index 3d85318e4d..3e5c949dbe 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetrics.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetrics.java @@ -53,6 +53,7 @@ @Incubating(since = "1.1.0") @NonNullApi @NonNullFields +@Deprecated public class KafkaConsumerMetrics implements MeterBinder, AutoCloseable { private static final String JMX_DOMAIN = "kafka.consumer"; private static final String METRIC_NAME_PREFIX = "kafka.consumer."; diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java new file mode 100644 index 0000000000..1c668f8d42 --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java @@ -0,0 +1,198 @@ +/** + * Copyright 2020 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.kafka; + +import io.micrometer.core.annotation.Incubating; +import io.micrometer.core.instrument.FunctionCounter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.binder.MeterBinder; +import io.micrometer.core.lang.NonNullApi; +import io.micrometer.core.lang.NonNullFields; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +import static java.util.Collections.emptyList; + +/** + * Kafka metrics binder. + *

+ * It is based on {@code metrics()} method returning {@link Metric} map exposed by clients and + * streams interface. + *

+ * Meter names have the following convention: {@code kafka.(metric_group).(metric_name)} + * + * @author Jorge Quilcate + * @see Kakfa monitoring + * documentation + * @since 1.4.0 + */ +@Incubating(since = "1.4.0") +@NonNullApi +@NonNullFields +class KafkaMetrics implements MeterBinder { + static final String METRIC_NAME_PREFIX = "kafka."; + static final String METRIC_GROUP_APP_INFO = "app-info"; + static final String METRIC_GROUP_METRICS_COUNT = "kafka-metrics-count"; + static final String VERSION_METRIC_NAME = "version"; + static final String START_TIME_METRIC_NAME = "start-time-ms"; + + private final Supplier> metricsSupplier; + private final Iterable extraTags; + private final int extraTagsSize; + + /** + * Keeps track of current set of metrics. When this values change, metrics are bound again. + */ + private volatile Set currentMeters = new HashSet<>(); + + private String kafkaVersion = "unknown"; + + KafkaMetrics(Supplier> metricsSupplier) { + this(metricsSupplier, emptyList()); + } + + KafkaMetrics(Supplier> metricsSupplier, Iterable extraTags) { + this.metricsSupplier = metricsSupplier; + this.extraTags = extraTags; + int i = 0; + for (Tag ignored : extraTags) i++; + this.extraTagsSize = i + 1; // 1 = kafka version tag + } + + @Override public void bindTo(MeterRegistry registry) { + Map metrics = metricsSupplier.get(); + // Collect static metrics and tags + Metric startTimeMetric = null; + for (Map.Entry entry: metrics.entrySet()) { + MetricName name = entry.getKey(); + if (METRIC_GROUP_APP_INFO.equals(name.group())) { + if (VERSION_METRIC_NAME.equals(name.name())) + kafkaVersion = (String) entry.getValue().metricValue(); + else if (START_TIME_METRIC_NAME.equals(name.name())) + startTimeMetric = entry.getValue(); + } + } + if (startTimeMetric != null) bindMeter(registry, startTimeMetric); + // Collect dynamic metrics + checkAndBindMetrics(registry); + } + + /** + * Gather metrics from Kafka metrics API and register Meters. + *

+ * As this is a one-off execution when binding a Kafka client, Meters include a call to this + * validation to double-check new metrics when returning values. This should only add the cost of + * validating meters registered counter when no new meters are present. + */ + void checkAndBindMetrics(MeterRegistry registry) { + Map metrics = metricsSupplier.get(); + if (!currentMeters.equals(metrics.keySet())) { + synchronized (this) { //Enforce only happens once when metrics change + if (!currentMeters.equals(metrics.keySet())) { + //Register meters + currentMeters = new HashSet<>(metrics.keySet()); + metrics.forEach((name, metric) -> { + //Filter out metrics from groups that includes metadata + if (METRIC_GROUP_APP_INFO.equals(name.group())) return; + if (METRIC_GROUP_METRICS_COUNT.equals(name.group())) return; + //Kafka has metrics with lower number of tags (e.g. with/without topic or partition tag) + //Remove meters with lower number of tags + boolean hasLessTags = false; + Collection meters = registry.find(metricName(metric)).meters(); + for (Meter meter : meters) { + if (meter.getId().getTags().size() < (metricTags(metric).size() + extraTagsSize)) + registry.remove(meter); + // Check if already exists + else if (meter.getId().getTags().equals(metricTags(metric))) return; + else hasLessTags = true; + } + if (hasLessTags) return; + //Filter out non-numeric values + if (!isNumber(metric)) return; + bindMeter(registry, metric); + }); + } + } + } + } + + private boolean isNumber(Metric metric) { + return metric.metricValue() instanceof Double + || metric.metricValue() instanceof Float + || metric.metricValue() instanceof Integer + || metric.metricValue() instanceof Long; + } + + private void bindMeter(MeterRegistry registry, Metric metric) { + String name = metricName(metric); + if (name.endsWith("total") || name.endsWith("count")) + registerCounter(registry, metric, name, extraTags); + else if (name.endsWith("min") || name.endsWith("max") || name.endsWith("avg") || name.endsWith("rate")) + registerGauge(registry, metric, name, extraTags); + else registerGauge(registry, metric, name, extraTags); + } + + private void registerGauge(MeterRegistry registry, Metric metric, String metricName, Iterable extraTags) { + Gauge.builder(metricName, metric, toMetricValue(registry)) + .tags(metricTags(metric)) + .tags(extraTags) + .description(metric.metricName().description()) + .register(registry); + } + + private void registerCounter(MeterRegistry registry, Metric metric, String metricName, Iterable extraTags) { + FunctionCounter.builder(metricName, metric, toMetricValue(registry)) + .tags(metricTags(metric)) + .tags(extraTags) + .description(metric.metricName().description()) + .register(registry); + } + + private ToDoubleFunction toMetricValue(MeterRegistry registry) { + return metric -> { + //Double-check if new metrics are registered; if not (common scenario) + //it only adds metrics count validation + checkAndBindMetrics(registry); + if (metric.metricValue() instanceof Double) return (double) metric.metricValue(); + else if (metric.metricValue() instanceof Integer) return ((Integer) metric.metricValue()).doubleValue(); + else if (metric.metricValue() instanceof Long) return ((Long) metric.metricValue()).doubleValue(); + else return ((Float) metric.metricValue()).doubleValue(); + }; + } + + private List metricTags(Metric metric) { + List tags = new ArrayList<>(); + tags.add(Tag.of("kafka-version", kafkaVersion)); + metric.metricName().tags().forEach((key, value) -> tags.add(Tag.of(key, value))); + return tags; + } + + private String metricName(Metric metric) { + String name = METRIC_NAME_PREFIX + metric.metricName().group() + "." + metric.metricName().name(); + return name.replaceAll("-metrics", "").replaceAll("-", "."); + } +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetrics.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetrics.java new file mode 100644 index 0000000000..4c656951cc --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetrics.java @@ -0,0 +1,60 @@ +/** + * Copyright 2020 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use super file except + * in compliance with the License. You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.micrometer.core.instrument.binder.kafka; + +import io.micrometer.core.annotation.Incubating; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.binder.MeterBinder; +import io.micrometer.core.lang.NonNullApi; +import io.micrometer.core.lang.NonNullFields; +import org.apache.kafka.common.Metric; +import org.apache.kafka.streams.KafkaStreams; + +/** + * Kafka Streams metrics binder. + *

+ * It is based on {@code metrics()} method returning {@link Metric} map exposed by clients and + * streams interface. + *

+ * Meter names have the following convention: {@code kafka.(metric_group).(metric_name)} + * + * @author Jorge Quilcate + * @see Kakfa monitoring + * documentation + * @since 1.4.0 + */ +@Incubating(since = "1.4.0") +@NonNullApi +@NonNullFields +public class KafkaStreamsMetrics extends KafkaMetrics implements MeterBinder { + + /** + * {@link KafkaStreams} metrics binder + * + * @param kafkaStreams instance to be instrumented + * @param tags additional tags + */ + public KafkaStreamsMetrics(KafkaStreams kafkaStreams, Iterable tags) { + super(kafkaStreams::metrics, tags); + } + + /** + * {@link KafkaStreams} metrics binder + * + * @param kafkaStreams instance to be instrumented + */ + public KafkaStreamsMetrics(KafkaStreams kafkaStreams) { + super(kafkaStreams::metrics); + } +} diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/db/MetricsDSLContextTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/db/MetricsDSLContextTest.java index 51d274d5ca..b3ee6a9cab 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/db/MetricsDSLContextTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/db/MetricsDSLContextTest.java @@ -18,6 +18,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.core.lang.NonNull; import org.jooq.*; import org.jooq.exception.DataAccessException; import org.jooq.impl.DSL; @@ -177,6 +178,7 @@ void timeTwoStatementsCreatedBeforeEitherIsExecuted() throws SQLException { } } + @NonNull private MetricsDSLContext createDatabase(Connection conn) { Configuration configuration = new DefaultConfiguration() .set(conn) diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaAdminMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaAdminMetricsTest.java new file mode 100644 index 0000000000..a5df5fd062 --- /dev/null +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaAdminMetricsTest.java @@ -0,0 +1,65 @@ +/** + * Copyright 2020 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.kafka; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.Properties; +import org.apache.kafka.clients.admin.AdminClient; +import org.junit.jupiter.api.Test; + +import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX; +import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; + +class KafkaAdminMetricsTest { + private final static String BOOTSTRAP_SERVERS = "localhost:9092"; + private Tags tags = Tags.of("app", "myapp", "version", "1"); + + @Test void shouldCreateMeters() { + try (AdminClient adminClient = createAdmin()) { + KafkaClientMetrics metrics = new KafkaClientMetrics(adminClient); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + assertThat(registry.getMeters()) + .hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getName()) + .allMatch(s -> s.startsWith(METRIC_NAME_PREFIX)); + } + } + + @Test void shouldCreateMetersWithTags() { + try (AdminClient adminClient = createAdmin()) { + KafkaClientMetrics metrics = new KafkaClientMetrics(adminClient, tags); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + + assertThat(registry.getMeters()) + .hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getTag("app")) + .allMatch(s -> s.equals("myapp")); + } + } + + private AdminClient createAdmin() { + Properties adminConfig = new Properties(); + adminConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + return AdminClient.create(adminConfig); + } +} diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsIT.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsIT.java new file mode 100644 index 0000000000..7ed90268a7 --- /dev/null +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsIT.java @@ -0,0 +1,116 @@ +/** + * Copyright 2020 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.kafka; + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static java.lang.System.out; +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +@Tag("docker") +class KafkaClientMetricsIT { + @Container + private KafkaContainer kafkaContainer = new KafkaContainer(); + + @Test + void shouldManageProducerAndConsumerMetrics() { + SimpleMeterRegistry registry = new SimpleMeterRegistry(); + + assertThat(registry.getMeters()).hasSize(0); + + Properties producerConfigs = new Properties(); + producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + kafkaContainer.getBootstrapServers()); + Producer producer = new KafkaProducer<>( + producerConfigs, new StringSerializer(), new StringSerializer()); + + new KafkaClientMetrics(producer).bindTo(registry); + + int producerMetrics = registry.getMeters().size(); + assertThat(registry.getMeters()).hasSizeGreaterThan(0); + assertThat(registry.getMeters()) + .extracting(m -> m.getId().getTag("kafka-version")) + .allMatch(v -> !v.isEmpty()); + + Properties consumerConfigs = new Properties(); + consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + kafkaContainer.getBootstrapServers()); + consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + Consumer consumer = new KafkaConsumer<>( + consumerConfigs, new StringDeserializer(), new StringDeserializer()); + + new KafkaClientMetrics(consumer).bindTo(registry); + + //Printing out for discovery purposes + out.println("Meters from producer before sending:"); + registry.getMeters().forEach(meter -> out.println(meter.getId() + " => " + meter.measure())); + + int producerAndConsumerMetrics = registry.getMeters().size(); + assertThat(registry.getMeters()).hasSizeGreaterThan(producerMetrics); + assertThat(registry.getMeters()) + .extracting(m -> m.getId().getTag("kafka-version")) + .allMatch(v -> !v.isEmpty()); + + String topic = "test"; + producer.send(new ProducerRecord<>(topic, "key", "value")); + producer.flush(); + + //Printing out for discovery purposes + out.println("Meters from producer after sending and consumer before poll:"); + registry.getMeters().forEach(meter -> out.println(meter.getId() + " => " + meter.measure())); + + int producerAndConsumerMetricsAfterSend = registry.getMeters().size(); + assertThat(registry.getMeters()).hasSizeGreaterThan(producerAndConsumerMetrics); + assertThat(registry.getMeters()) + .extracting(m -> m.getId().getTag("kafka-version")) + .allMatch(v -> !v.isEmpty()); + + consumer.subscribe(Collections.singletonList(topic)); + + consumer.poll(Duration.ofMillis(100)); + + //Printing out for discovery purposes + out.println("Meters from producer and consumer after polling:"); + registry.getMeters().forEach(meter -> out.println(meter.getId() + " => " + meter.measure())); + + assertThat(registry.getMeters()).hasSizeGreaterThan(producerAndConsumerMetricsAfterSend); + assertThat(registry.getMeters()) + .extracting(m -> m.getId().getTag("kafka-version")) + .allMatch(v -> !v.isEmpty()); + + //Printing out for discovery purposes + out.println("All meters from producer and consumer:"); + registry.getMeters().forEach(meter -> out.println(meter.getId() + " => " + meter.measure())); + } +} diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetricsTest.java index 62f1e523e4..5653c45612 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetricsTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetricsTest.java @@ -1,5 +1,5 @@ /** - * Copyright 2018 Pivotal Software, Inc. + * Copyright 2020 Pivotal Software, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,143 +15,59 @@ */ package io.micrometer.core.instrument.binder.kafka; -import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Test; -import java.util.Collections; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - +import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; -/** - * Tests for {@link KafkaConsumerMetrics}. - * - * @author Jon Schneider - * @author Johnny Lim - */ class KafkaConsumerMetricsTest { - private final static String TOPIC = "my-example-topic"; private final static String BOOTSTRAP_SERVERS = "localhost:9092"; - private static int consumerCount = 0; - private Tags tags = Tags.of("app", "myapp", "version", "1"); - private KafkaConsumerMetrics kafkaConsumerMetrics = new KafkaConsumerMetrics(tags); - - @Test - void verifyConsumerMetricsWithExpectedTags() { - try (Consumer consumer = createConsumer()) { + @Test void shouldCreateMeters() { + try (Consumer consumer = createConsumer()) { + KafkaClientMetrics metrics = new KafkaClientMetrics(consumer); MeterRegistry registry = new SimpleMeterRegistry(); - kafkaConsumerMetrics.bindTo(registry); - - // consumer coordinator metrics - Gauge assignedPartitions = registry.get("kafka.consumer.assigned.partitions").tags(tags).gauge(); - assertThat(assignedPartitions.getId().getTag("client.id")).isEqualTo("consumer-" + consumerCount); - // global connection metrics - Gauge connectionCount = registry.get("kafka.consumer.connection.count").tags(tags).gauge(); - assertThat(connectionCount.getId().getTag("client.id")).startsWith("consumer-" + consumerCount); + metrics.bindTo(registry); + assertThat(registry.getMeters()) + .hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getName()) + .allMatch(s -> s.startsWith(METRIC_NAME_PREFIX)); } } - @Test - void metricsReportedPerMultipleConsumers() { - try (Consumer consumer = createConsumer(); - Consumer consumer2 = createConsumer()) { - + @Test void shouldCreateMetersWithTags() { + try (Consumer consumer = createConsumer()) { + KafkaClientMetrics metrics = new KafkaClientMetrics(consumer, tags); MeterRegistry registry = new SimpleMeterRegistry(); - kafkaConsumerMetrics.bindTo(registry); - - // fetch metrics - registry.get("kafka.consumer.fetch.total").tag("client.id", "consumer-" + consumerCount).functionCounter(); - registry.get("kafka.consumer.fetch.total").tag("client.id", "consumer-" + (consumerCount - 1)).functionCounter(); - } - } - - @Test - void newConsumersAreDiscoveredByListener() throws InterruptedException { - MeterRegistry registry = new SimpleMeterRegistry(); - kafkaConsumerMetrics.bindTo(registry); - - CountDownLatch latch = new CountDownLatch(1); - registry.config().onMeterAdded(m -> { - if (m.getId().getName().contains("kafka")) - latch.countDown(); - }); - - try (Consumer consumer = createConsumer()) { - latch.await(10, TimeUnit.SECONDS); - } - } - - @Test - void verifyKafkaMajorVersion() { - try (Consumer consumer = createConsumer()) { - Tags tags = Tags.of("client.id", "consumer-" + consumerCount); - assertThat(kafkaConsumerMetrics.kafkaMajorVersion(tags)).isGreaterThanOrEqualTo(2); - } - } - - @Test - void returnsNegativeKafkaMajorVersionWhenMBeanInstanceNotFound() { - try (Consumer consumer = createConsumer()) { - Tags tags = Tags.of("client.id", "invalid"); - assertThat(kafkaConsumerMetrics.kafkaMajorVersion(tags)).isEqualTo(-1); - } - } - - @Test - void returnsNegativeKafkaMajorVersionForEmptyTags() { - try (Consumer consumer = createConsumer()) { - assertThat(kafkaConsumerMetrics.kafkaMajorVersion(Tags.empty())).isEqualTo(-1); - } - } - - @Test - void consumerBeforeBindingWhenClosedShouldRemoveMeters() { - MeterRegistry registry = new SimpleMeterRegistry(); - try (Consumer consumer = createConsumer()) { - kafkaConsumerMetrics.bindTo(registry); - - Gauge gauge = registry.get("kafka.consumer.assigned.partitions").gauge(); - assertThat(gauge.getId().getTag("client.id")).isEqualTo("consumer-" + consumerCount); - } - assertThat(registry.find("kafka.consumer.assigned.partitions").gauge()).isNull(); - } - @Test - void consumerAfterBindingWhenClosedShouldRemoveMeters() { - MeterRegistry registry = new SimpleMeterRegistry(); - kafkaConsumerMetrics.bindTo(registry); + metrics.bindTo(registry); - try (Consumer consumer = createConsumer()) { - Gauge gauge = registry.get("kafka.consumer.assigned.partitions").gauge(); - assertThat(gauge.getId().getTag("client.id")).isEqualTo("consumer-" + consumerCount); + assertThat(registry.getMeters()) + .hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getTag("app")) + .allMatch(s -> s.equals("myapp")); } - assertThat(registry.find("kafka.consumer.assigned.partitions").gauge()).isNull(); } - private Consumer createConsumer() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "MicrometerTestConsumer"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - - Consumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Collections.singletonList(TOPIC)); - consumerCount++; - return consumer; + private Consumer createConsumer() { + Properties consumerConfig = new Properties(); + consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + consumerConfig.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerConfig.put(GROUP_ID_CONFIG, "group"); + return new KafkaConsumer<>(consumerConfig); } - } diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java new file mode 100644 index 0000000000..678e5a9ea1 --- /dev/null +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java @@ -0,0 +1,164 @@ +/** + * Copyright 2020 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.kafka; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.stats.Value; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class KafkaMetricsTest { + + @Test void shouldKeepMetersWhenMetricsDoNotChange() { + //Given + Supplier> supplier = () -> { + Map metrics = new LinkedHashMap<>(); + MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>()); + KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM); + metrics.put(metricName, metric); + return metrics; + }; + KafkaMetrics kafkaMetrics = new KafkaMetrics(supplier); + MeterRegistry registry = new SimpleMeterRegistry(); + //When + kafkaMetrics.checkAndBindMetrics(registry); + //Then + assertThat(registry.getMeters()).hasSize(1); + //When + kafkaMetrics.checkAndBindMetrics(registry); + //Then + assertThat(registry.getMeters()).hasSize(1); + } + + @Test void shouldAddNewMetersWhenMetricsChange() { + //Given + AtomicReference> metrics = new AtomicReference<>(); + metrics.set(new LinkedHashMap<>()); + Supplier> supplier = () -> metrics.updateAndGet(map -> { + MetricName metricName = new MetricName("a0", "b0", "c0", new LinkedHashMap<>()); + KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM); + map.put(metricName, metric); + return map; + }); + KafkaMetrics kafkaMetrics = new KafkaMetrics(supplier); + MeterRegistry registry = new SimpleMeterRegistry(); + //When + kafkaMetrics.checkAndBindMetrics(registry); + //Then + assertThat(registry.getMeters()).hasSize(1); + //Given + metrics.updateAndGet(map -> { + MetricName metricName = new MetricName("a1", "b1", "c1", new LinkedHashMap<>()); + KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM); + map.put(metricName, metric); + return map; + }); + //When + kafkaMetrics.checkAndBindMetrics(registry); + //Then + assertThat(registry.getMeters()).hasSize(2); + } + + @Test void shouldNotAddAppInfoMetrics() { + //Given + Map metrics = new LinkedHashMap<>(); + Supplier> supplier = () -> { + MetricName metricName = new MetricName("a0", "b0", "c0", new LinkedHashMap<>()); + KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM); + metrics.put(metricName, metric); + MetricName appInfoMetricName = + new MetricName("a1", KafkaMetrics.METRIC_GROUP_APP_INFO, "c0", + new LinkedHashMap<>()); + KafkaMetric appInfoMetric = + new KafkaMetric(this, appInfoMetricName, new Value(), new MetricConfig(), Time.SYSTEM); + metrics.put(appInfoMetricName, appInfoMetric); + return metrics; + }; + KafkaMetrics kafkaMetrics = new KafkaMetrics(supplier); + MeterRegistry registry = new SimpleMeterRegistry(); + //When + kafkaMetrics.checkAndBindMetrics(registry); + //Then + assertThat(registry.getMeters()).hasSize(1); + //When + kafkaMetrics.checkAndBindMetrics(registry); + //Then + assertThat(registry.getMeters()).hasSize(1); + } + + @Test void shouldRemoveNewerMeterWithLessTags() { + //Given + Map tags = new LinkedHashMap<>(); + Supplier> supplier = () -> { + Map metrics = new LinkedHashMap<>(); + MetricName metricName = new MetricName("a", "b", "c", tags); + KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM); + metrics.put(metricName, metric); + return metrics; + }; + KafkaMetrics kafkaMetrics = new KafkaMetrics(supplier); + MeterRegistry registry = new SimpleMeterRegistry(); + //When + kafkaMetrics.checkAndBindMetrics(registry); + //Then + assertThat(registry.getMeters()).hasSize(1); + assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(1); //only version + //Given + tags.put("key0", "value0"); + //When + kafkaMetrics.checkAndBindMetrics(registry); + //Then + assertThat(registry.getMeters()).hasSize(1); + assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2); + } + + @Test void shouldRemoveMeterWithLessTags() { + //Given + Supplier> supplier = () -> { + MetricName firstName = new MetricName("a", "b", "c", Collections.emptyMap()); + KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM); + Map tags = new LinkedHashMap<>(); + tags.put("key0", "value0"); + MetricName secondName = new MetricName("a", "b", "c", tags); + KafkaMetric secondMetric = new KafkaMetric(this, secondName, new Value(), new MetricConfig(), Time.SYSTEM); + Map metrics = new LinkedHashMap<>(); + metrics.put(firstName, firstMetric); + metrics.put(secondName, secondMetric); + return metrics; + }; + KafkaMetrics kafkaMetrics = new KafkaMetrics(supplier); + MeterRegistry registry = new SimpleMeterRegistry(); + //When + kafkaMetrics.checkAndBindMetrics(registry); + //Then + assertThat(registry.getMeters()).hasSize(1); + Meter meter = registry.getMeters().get(0); + assertThat(meter.getId().getTags()).hasSize(2); // version + key0 + } +} diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaProducerMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaProducerMetricsTest.java new file mode 100644 index 0000000000..cf35d07a99 --- /dev/null +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaProducerMetricsTest.java @@ -0,0 +1,71 @@ +/** + * Copyright 2020 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.kafka; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.Properties; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; + +import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX; +import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; + +class KafkaProducerMetricsTest { + private final static String BOOTSTRAP_SERVERS = "localhost:9092"; + private Tags tags = Tags.of("app", "myapp", "version", "1"); + + @Test void shouldCreateMeters() { + try (Producer producer = createProducer()) { + KafkaClientMetrics metrics = new KafkaClientMetrics(producer); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + assertThat(registry.getMeters()) + .hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getName()) + .allMatch(s -> s.startsWith(METRIC_NAME_PREFIX)); + } + } + + @Test void shouldCreateMetersWithTags() { + try (Producer producer = createProducer()) { + KafkaClientMetrics metrics = new KafkaClientMetrics(producer, tags); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + + assertThat(registry.getMeters()) + .hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getTag("app")) + .allMatch(s -> s.equals("myapp")); + } + } + + private Producer createProducer() { + Properties producerConfig = new Properties(); + producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + producerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new KafkaProducer<>(producerConfig); + } +} diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetricsTest.java new file mode 100644 index 0000000000..4d7f850ef0 --- /dev/null +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetricsTest.java @@ -0,0 +1,70 @@ +/** + * Copyright 2020 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.kafka; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.Properties; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.junit.jupiter.api.Test; + +import static io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics.METRIC_NAME_PREFIX; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; + +class KafkaStreamsMetricsTest { + private final static String BOOTSTRAP_SERVERS = "localhost:9092"; + private Tags tags = Tags.of("app", "myapp", "version", "1"); + + @Test void shouldCreateMeters() { + try (KafkaStreams kafkaStreams = createStreams()) { + KafkaStreamsMetrics metrics = new KafkaStreamsMetrics(kafkaStreams); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + assertThat(registry.getMeters()) + .hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getName()) + .allMatch(s -> s.startsWith(METRIC_NAME_PREFIX)); + } + } + + @Test void shouldCreateMetersWithTags() { + try (KafkaStreams kafkaStreams = createStreams()) { + KafkaStreamsMetrics metrics = new KafkaStreamsMetrics(kafkaStreams, tags); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + + assertThat(registry.getMeters()) + .hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getTag("app")) + .allMatch(s -> s.equals("myapp")); + } + } + + private KafkaStreams createStreams() { + StreamsBuilder builder = new StreamsBuilder(); + builder.stream("input").to("output"); + Properties streamsConfig = new Properties(); + streamsConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + streamsConfig.put(APPLICATION_ID_CONFIG, "app"); + return new KafkaStreams(builder.build(), streamsConfig); + } +} diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/OldKafkaConsumerMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/OldKafkaConsumerMetricsTest.java new file mode 100644 index 0000000000..148546def1 --- /dev/null +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/OldKafkaConsumerMetricsTest.java @@ -0,0 +1,158 @@ +/** + * Copyright 2018 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.kafka; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link KafkaConsumerMetrics}. + * + * @author Jon Schneider + * @author Johnny Lim + */ +@Disabled // as KafkaConsumerMetrics is deprecated +class OldKafkaConsumerMetricsTest { + private final static String TOPIC = "my-example-topic"; + private final static String BOOTSTRAP_SERVERS = "localhost:9092"; + private static int consumerCount = 0; + + private Tags tags = Tags.of("app", "myapp", "version", "1"); + private KafkaConsumerMetrics kafkaConsumerMetrics = new KafkaConsumerMetrics(tags); + + @Test + void verifyConsumerMetricsWithExpectedTags() { + try (Consumer consumer = createConsumer()) { + + MeterRegistry registry = new SimpleMeterRegistry(); + kafkaConsumerMetrics.bindTo(registry); + + // consumer coordinator metrics + Gauge assignedPartitions = registry.get("kafka.consumer.assigned.partitions").tags(tags).gauge(); + assertThat(assignedPartitions.getId().getTag("client.id")).isEqualTo("consumer-" + consumerCount); + + // global connection metrics + Gauge connectionCount = registry.get("kafka.consumer.connection.count").tags(tags).gauge(); + assertThat(connectionCount.getId().getTag("client.id")).startsWith("consumer-" + consumerCount); + } + } + + @Test + void metricsReportedPerMultipleConsumers() { + try (Consumer consumer = createConsumer(); + Consumer consumer2 = createConsumer()) { + + MeterRegistry registry = new SimpleMeterRegistry(); + kafkaConsumerMetrics.bindTo(registry); + + // fetch metrics + registry.get("kafka.consumer.fetch.total").tag("client.id", "consumer-" + consumerCount).functionCounter(); + registry.get("kafka.consumer.fetch.total").tag("client.id", "consumer-" + (consumerCount - 1)).functionCounter(); + } + } + + @Test + void newConsumersAreDiscoveredByListener() throws InterruptedException { + MeterRegistry registry = new SimpleMeterRegistry(); + kafkaConsumerMetrics.bindTo(registry); + + CountDownLatch latch = new CountDownLatch(1); + registry.config().onMeterAdded(m -> { + if (m.getId().getName().contains("kafka")) + latch.countDown(); + }); + + try (Consumer consumer = createConsumer()) { + latch.await(10, TimeUnit.SECONDS); + } + } + + @Test + void verifyKafkaMajorVersion() { + try (Consumer consumer = createConsumer()) { + Tags tags = Tags.of("client.id", "consumer-" + consumerCount); + assertThat(kafkaConsumerMetrics.kafkaMajorVersion(tags)).isGreaterThanOrEqualTo(2); + } + } + + @Test + void returnsNegativeKafkaMajorVersionWhenMBeanInstanceNotFound() { + try (Consumer consumer = createConsumer()) { + Tags tags = Tags.of("client.id", "invalid"); + assertThat(kafkaConsumerMetrics.kafkaMajorVersion(tags)).isEqualTo(-1); + } + } + + @Test + void returnsNegativeKafkaMajorVersionForEmptyTags() { + try (Consumer consumer = createConsumer()) { + assertThat(kafkaConsumerMetrics.kafkaMajorVersion(Tags.empty())).isEqualTo(-1); + } + } + + @Test + void consumerBeforeBindingWhenClosedShouldRemoveMeters() { + MeterRegistry registry = new SimpleMeterRegistry(); + try (Consumer consumer = createConsumer()) { + kafkaConsumerMetrics.bindTo(registry); + + Gauge gauge = registry.get("kafka.consumer.assigned.partitions").gauge(); + assertThat(gauge.getId().getTag("client.id")).isEqualTo("consumer-" + consumerCount); + } + assertThat(registry.find("kafka.consumer.assigned.partitions").gauge()).isNull(); + } + + @Test + void consumerAfterBindingWhenClosedShouldRemoveMeters() { + MeterRegistry registry = new SimpleMeterRegistry(); + kafkaConsumerMetrics.bindTo(registry); + + try (Consumer consumer = createConsumer()) { + Gauge gauge = registry.get("kafka.consumer.assigned.partitions").gauge(); + assertThat(gauge.getId().getTag("client.id")).isEqualTo("consumer-" + consumerCount); + } + assertThat(registry.find("kafka.consumer.assigned.partitions").gauge()).isNull(); + } + + private Consumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "MicrometerTestConsumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + Consumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(TOPIC)); + consumerCount++; + return consumer; + } + +}