Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] PIP-342: OTel client metrics support #22179

Merged
merged 26 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
babad52
WIP: PIP-342: OTel client metrics support
merlimat Feb 29, 2024
8e184be
Addressed comments
merlimat Mar 4, 2024
4d0120b
Removed MetricsCardinality from API
merlimat Mar 5, 2024
252196e
Fixed tests code
merlimat Mar 5, 2024
4b31eb1
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 5, 2024
445231c
Ensure single-initialization of InstrumentProvider
merlimat Mar 5, 2024
e840d06
Added unit tests
merlimat Mar 6, 2024
6cc759a
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 6, 2024
a0cb78d
Addressed more comments
merlimat Mar 8, 2024
5af2018
Share the histogram objects
merlimat Mar 9, 2024
218f57b
Added javadoc and reset old metrics default
merlimat Mar 11, 2024
bd1fb3f
Added missing license header
merlimat Mar 11, 2024
59bf64b
Removed trailing spaces
merlimat Mar 11, 2024
857c0a3
Fixed checkstyle
merlimat Mar 11, 2024
e103369
Checkstyle
merlimat Mar 11, 2024
1f2d0c8
Checkstyle
merlimat Mar 11, 2024
413360c
Fixed spotbugs
merlimat Mar 11, 2024
345f240
Fixed compilation
merlimat Mar 11, 2024
0b60e7d
Fix
merlimat Mar 11, 2024
70dd347
Fixed test
merlimat Mar 12, 2024
8fe109f
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 13, 2024
d90f621
Fixed tests mock
merlimat Mar 14, 2024
22d46af
Fixed test
merlimat Mar 14, 2024
a9f3857
Attach subscription attribute to consumer metrics
merlimat Mar 14, 2024
980bae3
More test fix
merlimat Mar 14, 2024
545a7d3
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added unit tests
  • Loading branch information
merlimat committed Mar 6, 2024
commit e840d06dcd6f6305378f1c7c065b250ab1f5d086
5 changes: 5 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
merlimat marked this conversation as resolved.
Show resolved Hide resolved
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.metrics;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class ClientMetricsTest extends ProducerConsumerBase {

InMemoryMetricReader reader;
OpenTelemetry otel;

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();

this.reader = InMemoryMetricReader.create();
SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
.registerMetricReader(reader)
.build();
this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

private Map<String, MetricData> collectMetrics() {
Map<String, MetricData> metrics = new TreeMap<>();
for (MetricData md : reader.collectAllMetrics()) {
metrics.put(md.getName(), md);
}
return metrics;
}

private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
asafm marked this conversation as resolved.
Show resolved Hide resolved
Attributes expectedAttributes) {
assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
}

private long getCounterValue(Map<String, MetricData> metrics, String name,
Attributes expectedAttributes) {
MetricData md = metrics.get(name);
assertNotNull(md, "metric not found: " + name);
assertEquals(md.getType(), MetricDataType.LONG_SUM);

for (var ex : md.getLongSumData().getPoints()) {
if (ex.getAttributes().equals(expectedAttributes)) {
return ex.getValue();
}
}

fail("metric attributes not found: " + expectedAttributes);
return -1;
}

private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
Attributes expectedAttributes) {
assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
}

private long getHistoCountValue(Map<String, MetricData> metrics, String name,
Attributes expectedAttributes) {
MetricData md = metrics.get(name);
assertNotNull(md, "metric not found: " + name);
assertEquals(md.getType(), MetricDataType.HISTOGRAM);

for (var ex : md.getHistogramData().getPoints()) {
if (ex.getAttributes().equals(expectedAttributes)) {
return ex.getCount();
}
}

fail("metric attributes not found: " + expectedAttributes);
return -1;
}

@Test
public void testProducerMetrics() throws Exception {
String topic = newTopicName();

PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.openTelemetry(otel)
.build();

Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.create();

for (int i = 0; i < 5; i++) {
producer.send("Hello");
}

Attributes nsAttrs = Attributes.builder()
.put("pulsar.tenant", "my-property")
.put("pulsar.namespace", "my-property/my-ns")
merlimat marked this conversation as resolved.
Show resolved Hide resolved
.build();
Attributes nsAttrsSuccess = nsAttrs.toBuilder()
.put("success", true)
.build();

var metrics = collectMetrics();
System.err.println("All metrics: " + metrics.keySet());
merlimat marked this conversation as resolved.
Show resolved Hide resolved

assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
asafm marked this conversation as resolved.
Show resolved Hide resolved
assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);

assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
Attributes.builder()
.put("pulsar.lookup.transport-type", "binary")
.put("pulsar.lookup.type", "topic")
.put("success", true)
merlimat marked this conversation as resolved.
Show resolved Hide resolved
.build());
assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
Attributes.builder()
.put("pulsar.lookup.transport-type", "binary")
.put("pulsar.lookup.type", "metadata")
.put("success", true)
.build());

assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
assertCounterValue(metrics, "pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);


assertCounterValue(metrics, "pulsar.client.producer.opened", 1, nsAttrs);

producer.close();
client.close();

metrics = collectMetrics();
assertCounterValue(metrics, "pulsar.client.producer.closed", 1, nsAttrs);
assertCounterValue(metrics, "pulsar.client.connections.closed", 1, Attributes.empty());
}

@Test
public void testConnectionsFailedMetrics() throws Exception {
String topic = newTopicName();

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://invalid-pulsar-address:1234")
.operationTimeout(3, TimeUnit.SECONDS)
.openTelemetry(otel)
.build();

try {
merlimat marked this conversation as resolved.
Show resolved Hide resolved
client.newProducer(Schema.STRING)
.topic(topic)
.create();
fail("Should have failed the producer creation");
} catch (Exception e) {
// Expected
}

var metrics = collectMetrics();

assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",
merlimat marked this conversation as resolved.
Show resolved Hide resolved
Attributes.builder().put("pulsar.failure.type", "tcp-failed").build()) >= 1);
}

@Test
public void testPublishFailedMetrics() throws Exception {
String topic = newTopicName();

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(admin.getServiceUrl())
.operationTimeout(3, TimeUnit.SECONDS)
.openTelemetry(otel)
.build();

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.sendTimeout(3, TimeUnit.SECONDS)
.create();

// Make the client switch to non-existing broker to make publish fail
client.updateServiceUrl("pulsar://invalid-address:6650");


try {
producer.send("Hello");
fail("Should have failed to publish");
} catch (Exception e) {
// expected
}

var metrics = collectMetrics();

Attributes nsAttrs = Attributes.builder()
.put("pulsar.tenant", "my-property")
.put("pulsar.namespace", "my-property/my-ns")
.build();
Attributes nsAttrsFailure = nsAttrs.toBuilder()
.put("success", false)
.build();

assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 1, nsAttrsFailure);
assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 1, nsAttrsFailure);
}

@Test
public void testConsumerMetrics() throws Exception {
String topic = newTopicName();

PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.openTelemetry(otel)
.build();

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.create();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("my-sub")
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

for (int i = 0; i < 10; i++) {
producer.send("Hello");
}

Thread.sleep(1000);

Attributes nsAttrs = Attributes.builder()
.put("pulsar.tenant", "my-property")
.put("pulsar.namespace", "my-property/my-ns")
.build();
var metrics = collectMetrics();

assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());

assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
Attributes.builder()
.put("pulsar.lookup.transport-type", "binary")
.put("pulsar.lookup.type", "topic")
.put("success", true)
.build());
assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
Attributes.builder()
.put("pulsar.lookup.transport-type", "binary")
.put("pulsar.lookup.type", "metadata")
.put("success", true)
.build());

assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 10, nsAttrs);
assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 10, nsAttrs);
assertCounterValue(metrics, "pulsar.client.consumer.opened", 1, nsAttrs);

Message<String> msg1 = consumer.receive();
consumer.acknowledge(msg1);

Message<String> msg2 = consumer.receive();
consumer.negativeAcknowledge(msg2);

/* Message<String> msg3 = */ consumer.receive();

metrics = collectMetrics();
assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 7, nsAttrs);
merlimat marked this conversation as resolved.
Show resolved Hide resolved
assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 7, nsAttrs);

// Let msg3 to reach ack-timeout
Thread.sleep(3000);
asafm marked this conversation as resolved.
Show resolved Hide resolved

metrics = collectMetrics();
assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 8, nsAttrs);
assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 8, nsAttrs);

assertCounterValue(metrics, "pulsar.client.consumer.message.ack", 1, nsAttrs);
merlimat marked this conversation as resolved.
Show resolved Hide resolved
assertCounterValue(metrics, "pulsar.client.consumer.message.nack", 1, nsAttrs);
assertCounterValue(metrics, "pulsar.client.consumer.message.ack.timeout", 1, nsAttrs);

client.close();

metrics = collectMetrics();
assertCounterValue(metrics, "pulsar.client.consumer.closed", 1, nsAttrs);
assertCounterValue(metrics, "pulsar.client.connections.closed", 1, Attributes.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,6 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(T
CompletableFuture<PartitionedTopicMetadata> newFuture =
getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
newFutureCreated.setValue(newFuture);
newFuture.thenRun(() -> {
histoGetTopicMetadata.recordSuccess(System.nanoTime() - startTime);
}).exceptionally(x -> {
histoGetTopicMetadata.recordFailure(System.nanoTime() - startTime);
return null;
});
return newFuture;
});
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
"The number of acknowledged messages", topic, Attributes.empty());
consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages,
"The number of negatively acknowledged messages", topic, Attributes.empty());
consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.dlq", Unit.Messages,
"Number of messages sent to DLQ", topic, Attributes.empty());
consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages,
"The number of messages sent to DLQ", topic, Attributes.empty());
grabCnx();

consumersOpenedCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,15 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
"Publish RPC latency experienced internally by the client when sending data to receiving an ack", topic,
Attributes.empty());
publishedBytesCounter = ip.newCounter("pulsar.client.producer.message.send.size",
merlimat marked this conversation as resolved.
Show resolved Hide resolved
Unit.Bytes, "Bytes published", topic, Attributes.empty());
Unit.Bytes, "The number of bytes published", topic, Attributes.empty());
pendingMessagesCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.count", Unit.Messages,
"Pending messages for this producer", topic, Attributes.empty());
merlimat marked this conversation as resolved.
Show resolved Hide resolved
pendingBytesCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.size", Unit.Bytes,
"Pending bytes for this producer", topic, Attributes.empty());
merlimat marked this conversation as resolved.
Show resolved Hide resolved
producersOpenedCounter = ip.newCounter("pulsar.client.producer.opened", Unit.Sessions,
"Counter of producer sessions opened", topic, Attributes.empty());
"The number of producer sessions opened", topic, Attributes.empty());
producersClosedCounter = ip.newCounter("pulsar.client.producer.closed", Unit.Sessions,
"Counter of producer sessions closed", topic, Attributes.empty());
"The number of producer sessions closed", topic, Attributes.empty());

this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
Expand Down Expand Up @@ -2145,6 +2145,7 @@ private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
op.sequenceId, t);
}

client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
ReferenceCountUtil.safeRelease(op.cmd);
op.recycle();
Expand Down
Loading
Loading