Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] PIP-342: OTel client metrics support #22179

Merged
merged 26 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
babad52
WIP: PIP-342: OTel client metrics support
merlimat Feb 29, 2024
8e184be
Addressed comments
merlimat Mar 4, 2024
4d0120b
Removed MetricsCardinality from API
merlimat Mar 5, 2024
252196e
Fixed tests code
merlimat Mar 5, 2024
4b31eb1
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 5, 2024
445231c
Ensure single-initialization of InstrumentProvider
merlimat Mar 5, 2024
e840d06
Added unit tests
merlimat Mar 6, 2024
6cc759a
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 6, 2024
a0cb78d
Addressed more comments
merlimat Mar 8, 2024
5af2018
Share the histogram objects
merlimat Mar 9, 2024
218f57b
Added javadoc and reset old metrics default
merlimat Mar 11, 2024
bd1fb3f
Added missing license header
merlimat Mar 11, 2024
59bf64b
Removed trailing spaces
merlimat Mar 11, 2024
857c0a3
Fixed checkstyle
merlimat Mar 11, 2024
e103369
Checkstyle
merlimat Mar 11, 2024
1f2d0c8
Checkstyle
merlimat Mar 11, 2024
413360c
Fixed spotbugs
merlimat Mar 11, 2024
345f240
Fixed compilation
merlimat Mar 11, 2024
0b60e7d
Fix
merlimat Mar 11, 2024
70dd347
Fixed test
merlimat Mar 12, 2024
8fe109f
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 13, 2024
d90f621
Fixed tests mock
merlimat Mar 14, 2024
22d46af
Fixed test
merlimat Mar 14, 2024
a9f3857
Attach subscription attribute to consumer metrics
merlimat Mar 14, 2024
980bae3
More test fix
merlimat Mar 14, 2024
545a7d3
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed comments
  • Loading branch information
merlimat committed Mar 4, 2024
commit 8e184be917670cdfe8e48802f71c6ed3dec3afb0
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,20 @@ public BinaryProtoLookupService(PulsarClientImpl client,
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);

Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
Attributes attrs = Attributes.of(AttributeKey.stringKey("pulsar.lookup.transport-type"), "binary");

histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
merlimat marked this conversation as resolved.
Show resolved Hide resolved
asafm marked this conversation as resolved.
Show resolved Hide resolved
merlimat marked this conversation as resolved.
Show resolved Hide resolved
"Lookup operations",
attrs.toBuilder().put("type", "topic").build());
"Duration of lookup operations",
attrs.toBuilder().put("pulsar.lookup.type", "topic").build());
histoGetTopicMetadata = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
"Lookup operations",
attrs.toBuilder().put("type", "metadata").build());
"Duration of lookup operations",
attrs.toBuilder().put("pulsar.lookup.type", "metadata").build());
histoGetSchema = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
"Lookup operations",
attrs.toBuilder().put("type", "schema").build());
"Duration of lookup operations",
attrs.toBuilder().put("pulsar.lookup.type", "schema").build());
histoListTopics = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
"Lookup operations",
attrs.toBuilder().put("type", "list-topics").build());
"Duration of lookup operations",
attrs.toBuilder().put("pulsar.lookup.type", "list-topics").build());
}

@Override
Expand Down Expand Up @@ -160,9 +160,9 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(T
getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
newFutureCreated.setValue(newFuture);
newFuture.thenRun(() -> {
histoGetBroker.recordSuccess(System.nanoTime() - startTime);
histoGetTopicMetadata.recordSuccess(System.nanoTime() - startTime);
}).exceptionally(x -> {
histoGetBroker.recordFailure(System.nanoTime() - startTime);
histoGetTopicMetadata.recordFailure(System.nanoTime() - startTime);
return null;
});
return newFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ public ClientCnx(InstrumentProvider instrumentProvider, ClientConfigurationData
this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
+ (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
this.connectionsOpenedCounter = instrumentProvider.newCounter("pulsar.client.connections.opened", Unit.Connections,
merlimat marked this conversation as resolved.
Show resolved Hide resolved
"Counter of connections opened", Attributes.empty());
"The number of connections opened", Attributes.empty());
this.connectionsClosedCounter = instrumentProvider.newCounter("pulsar.client.connections.closed", Unit.Connections,
asafm marked this conversation as resolved.
Show resolved Hide resolved
"Counter of connections closed", Attributes.empty());
"The number of connections closed", Attributes.empty());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,12 @@ public ConnectionPool(InstrumentProvider instrumentProvider,
}, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds, TimeUnit.SECONDS);
}

connectionsTcpFailureCounter = instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.None,
"Counter of failed connections", Attributes.builder().put("type", "tcp-failed").build());
connectionsTcpFailureCounter =
instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.Connections,
"The number of failed connection attempts", Attributes.builder().put("pulsar.failure.type", "tcp-failed").build());
connectionsHandshakeFailureCounter = instrumentProvider.newCounter("pulsar.client.connections.failed",
Unit.None, "Counter of failed connections",
Attributes.builder().put("type", "handshake").build());
Unit.Connections, "The number of failed connection attempts",
Attributes.builder().put("pulsar.failure.type", "handshake").build());
}

private static AddressResolver<InetSocketAddress> createAddressResolver(ClientConfigurationData conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,10 +1228,7 @@ public int getTotalIncomingMessages() {

protected void clearIncomingMessages() {
// release messages if they are pooled messages
incomingMessages.forEach(msg -> {

msg.release();
});
incomingMessages.forEach(Message::release);
incomingMessages.clear();
resetIncomingMessageSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,19 +408,19 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
"Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
"Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
"Number of messages received", attrs);
bytesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Bytes,
"Bytes received", attrs);
messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,
messagesReceivedCounter = ip.newCounter("pulsar.client.received.count", Unit.Messages,
merlimat marked this conversation as resolved.
Show resolved Hide resolved
"The number of messages explicitly received by the consumer application", attrs);
bytesReceivedCounter = ip.newCounter("pulsar.client.received.size", Unit.Bytes,
"The number of bytes explicitly received by the consumer application", attrs);
messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.prefetched.count", Unit.Messages,
"Number of messages currently sitting in the consumer pre-fetch queue", attrs);
bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.preteched", Unit.Bytes,
bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.prefetched.size", Unit.Bytes,
"Total number of bytes currently sitting in the consumer pre-fetch queue", attrs);

consumerAcksCounter = ip.newCounter("pulsar.client.consumer.ack", Unit.Messages,
"Number of ack operations", attrs);
consumerNacksCounter = ip.newCounter("pulsar.client.consumer.nack", Unit.Messages,
"Number of negative ack operations", attrs);
consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages,
"The number of acknowledged messages", attrs);
consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages,
"The number of negatively acknowledged messages", attrs);
consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.dlq", Unit.Messages,
"Number of messages sent to DLQ", attrs);
grabCnx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,20 @@ public HttpLookupService(InstrumentProvider instrumentProvider, ClientConfigurat
this.useTls = conf.isUseTls();
this.listenerName = conf.getListenerName();

Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
Attributes attrs = Attributes.of(AttributeKey.stringKey("pulsar.lookup.transport-type"), "binary");
merlimat marked this conversation as resolved.
Show resolved Hide resolved

histoGetBroker = instrumentProvider.newLatencyHistogram("pulsar.client.lookup",
merlimat marked this conversation as resolved.
Show resolved Hide resolved
"Lookup operations",
attrs.toBuilder().put("type", "topic").build());
"Duration of lookup operations",
attrs.toBuilder().put("pulsar.lookup.type", "topic").build());
histoGetTopicMetadata = instrumentProvider.newLatencyHistogram("pulsar.client.lookup",
"Lookup operations",
attrs.toBuilder().put("type", "metadata").build());
"Duration of lookup operations",
attrs.toBuilder().put("pulsar.lookup.type", "metadata").build());
histoGetSchema = instrumentProvider.newLatencyHistogram("pulsar.client.lookup",
"Lookup operations",
attrs.toBuilder().put("type", "schema").build());
"Duration of lookup operations",
attrs.toBuilder().put("pulsar.lookup.type", "schema").build());
histoListTopics = instrumentProvider.newLatencyHistogram("pulsar.client.lookup",
"Lookup operations",
attrs.toBuilder().put("type", "list-topics").build());
"Duration of lookup operations",
attrs.toBuilder().put("pulsar.lookup.type", "list-topics").build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,15 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration

InstrumentProvider ip = client.instrumentProvider();
Attributes attrs = ip.getAttributes(topic);
latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.latency",
latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",
merlimat marked this conversation as resolved.
Show resolved Hide resolved
"Publish latency experienced by the application, includes client batching time", attrs);
rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration",
"Publish RPC latency experienced internally by the client when sending data to receiving an ack", attrs);
publishedBytesCounter = ip.newCounter("pulsar.client.producer.published",
publishedBytesCounter = ip.newCounter("pulsar.client.producer.message.send.size",
merlimat marked this conversation as resolved.
Show resolved Hide resolved
Unit.Bytes, "Bytes published", attrs);
pendingMessagesCounter = ip.newUpDownCounter("pulsar.client.producer.pending.messages.count", Unit.Messages,
pendingMessagesCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.count", Unit.Messages,
"Pending messages for this producer", attrs);
pendingBytesCounter = ip.newUpDownCounter("pulsar.client.producer.pending.count", Unit.Bytes,
pendingBytesCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.size", Unit.Bytes,
"Pending bytes for this producer", attrs);
producersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
merlimat marked this conversation as resolved.
Show resolved Hide resolved
"Counter of sessions opened", attrs.toBuilder().put("type", "producer").build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBa

InstrumentProvider ip = client.instrumentProvider();
Attributes attrs = ip.getAttributes(consumerBase.getTopic());
consumerAckTimeoutsCounter = ip.newCounter("pulsar.client.consumer.ack.timeout", Unit.Messages,
consumerAckTimeoutsCounter = ip.newCounter("pulsar.client.consumer.message.ack.timeout", Unit.Messages,
"Number of ack timeouts events", attrs);

if (conf.getAckTimeoutRedeliveryBackoff() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ public class ClientConfigurationData implements Serializable, Cloneable {
)
private String description;


private transient OpenTelemetry openTelemetry;
asafm marked this conversation as resolved.
Show resolved Hide resolved

private MetricsCardinality openTelemetryMetricsCardinality = MetricsCardinality.Topic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.apache.pulsar.client.impl.metrics;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.MetricsCardinality;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -14,7 +16,17 @@ public class InstrumentProvider {
private final MetricsCardinality metricsCardinality;

public InstrumentProvider(ClientConfigurationData conf) {
this.meter = conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
OpenTelemetry otel = conf.getOpenTelemetry();
if (otel == null) {
// By default, metrics are disabled, unless the OTel java agent is configured.
// This allows to enable metrics without any code change.
dao-jun marked this conversation as resolved.
Show resolved Hide resolved
otel = GlobalOpenTelemetry.get();
}

this.meter = otel.getMeterProvider()
.meterBuilder("org.apache.pulsar.client")
.setInstrumentationVersion(PulsarVersion.getVersion())
.build();
this.metricsCardinality = conf.getOpenTelemetryMetricsCardinality();
}

Expand All @@ -29,17 +41,17 @@ public Attributes getAttributes(String topic) {
switch (metricsCardinality) {
case Partition:
if (tn.isPartitioned()) {
ab.put("partition", tn.getPartitionIndex());
ab.put("pulsar.partition", tn.getPartitionIndex());
}
// fallthrough
case Topic:
ab.put("topic", tn.getPartitionedTopicName());
ab.put("pulsar.topic", tn.getPartitionedTopicName());
// fallthrough
case Namespace:
ab.put("namespace", tn.getNamespace());
ab.put("pulsar.namespace", tn.getNamespace());
// fallthrough
case Tenant:
ab.put("tenant", tn.getTenant());
ab.put("pulsar.tenant", tn.getTenant());
}

return ab.build();
Expand Down
Loading