Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] PIP-342: OTel client metrics support #22179

Merged
merged 26 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
babad52
WIP: PIP-342: OTel client metrics support
merlimat Feb 29, 2024
8e184be
Addressed comments
merlimat Mar 4, 2024
4d0120b
Removed MetricsCardinality from API
merlimat Mar 5, 2024
252196e
Fixed tests code
merlimat Mar 5, 2024
4b31eb1
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 5, 2024
445231c
Ensure single-initialization of InstrumentProvider
merlimat Mar 5, 2024
e840d06
Added unit tests
merlimat Mar 6, 2024
6cc759a
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 6, 2024
a0cb78d
Addressed more comments
merlimat Mar 8, 2024
5af2018
Share the histogram objects
merlimat Mar 9, 2024
218f57b
Added javadoc and reset old metrics default
merlimat Mar 11, 2024
bd1fb3f
Added missing license header
merlimat Mar 11, 2024
59bf64b
Removed trailing spaces
merlimat Mar 11, 2024
857c0a3
Fixed checkstyle
merlimat Mar 11, 2024
e103369
Checkstyle
merlimat Mar 11, 2024
1f2d0c8
Checkstyle
merlimat Mar 11, 2024
413360c
Fixed spotbugs
merlimat Mar 11, 2024
345f240
Fixed compilation
merlimat Mar 11, 2024
0b60e7d
Fix
merlimat Mar 11, 2024
70dd347
Fixed test
merlimat Mar 12, 2024
8fe109f
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 13, 2024
d90f621
Fixed tests mock
merlimat Mar 14, 2024
22d46af
Fixed test
merlimat Mar 14, 2024
a9f3857
Attach subscription attribute to consumer metrics
merlimat Mar 14, 2024
980bae3
More test fix
merlimat Mar 14, 2024
545a7d3
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ The Apache Software License, Version 2.0
- log4j-core-2.18.0.jar
- log4j-slf4j-impl-2.18.0.jar
- log4j-web-2.18.0.jar
* OpenTelemetry
- opentelemetry-api-1.34.1.jar
- opentelemetry-context-1.34.1.jar
- opentelemetry-extension-incubator-1.34.1-alpha.jar

* BookKeeper
- bookkeeper-common-allocator-4.16.4.jar
Expand Down
6 changes: 6 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
merlimat marked this conversation as resolved.
Show resolved Hide resolved
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -997,7 +998,8 @@ public void testLookupThrottlingForClientByClient() throws Exception {
// Using an AtomicReference in order to reset a new CountDownLatch
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
latchRef.set(new CountDownLatch(1));
try (ConnectionPool pool = new ConnectionPool(conf, eventLoop, () -> new ClientCnx(conf, eventLoop) {
try (ConnectionPool pool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoop,
() -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop) {
@Override
protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -197,7 +198,7 @@ public void testLookupThrottlingForClientByBroker() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon()));
ExecutorService executor = Executors.newFixedThreadPool(10);
try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
try (ConnectionPool pool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoop)) {
final int totalConsumers = 20;
List<Future<?>> futures = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void testProxyProtocol() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
@Cleanup
PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
(conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
byte[] bs = "PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes();
ctx.writeAndFlush(Unpooled.copiedBuffer(bs));
Expand All @@ -124,7 +125,7 @@ public void testPubSubWhenSlowNetwork() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
@Cleanup
PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
(conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Thread task = new Thread(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.util.netty.EventLoopUtil;

Expand All @@ -42,7 +43,7 @@ public static PulsarClientImpl create(final ClientBuilderImpl clientBuilder,
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), conf.isEnableBusyWait(), threadFactory);

// Inject into ClientCnx.
ConnectionPool pool = new ConnectionPool(conf, eventLoopGroup,
ConnectionPool pool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoopGroup,
() -> clientCnxFactory.generate(conf, eventLoopGroup));

return new InjectedClientCnxPulsarClientImpl(conf, eventLoopGroup, pool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -137,7 +138,7 @@ private void doFindBrokerWithListenerName(boolean useHttp) throws Exception {
conf.setMaxLookupRedirects(10);

@Cleanup
LookupService lookupService = useHttp ? new HttpLookupService(conf, eventExecutors) :
LookupService lookupService = useHttp ? new HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors) :
new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient,
lookupUrl.toString(), "internal", false, this.executorService);
TopicName topicName = TopicName.get("persistent://public/default/test");
Expand Down Expand Up @@ -172,7 +173,7 @@ public void testHttpLookupRedirect() throws Exception {
conf.setMaxLookupRedirects(10);

@Cleanup
HttpLookupService lookupService = new HttpLookupService(conf, eventExecutors);
HttpLookupService lookupService = new HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors);
NamespaceService namespaceService = pulsar.getNamespaceService();

LookupResult lookupResult = new LookupResult(pulsar.getWebServiceAddress(), null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -68,7 +69,8 @@ protected void cleanup() throws Exception {
public void testSingleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop);
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);

Expand Down Expand Up @@ -118,7 +120,7 @@ public void testSelectConnectionForSameProducer() throws Exception {
public void testDoubleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop);
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);

Expand All @@ -143,7 +145,8 @@ public void testNoConnectionPool() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(0);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop);

InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
Expand All @@ -166,7 +169,8 @@ public void testEnableConnectionPool() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(5);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop);

InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
Expand Down Expand Up @@ -233,8 +237,10 @@ protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws
}
};

ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop,
(Supplier<ClientCnx>) () -> new ClientCnx(conf, eventLoop), Optional.of(resolver));
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop,
(Supplier<ClientCnx>) () -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop),
Optional.of(resolver));


ClientCnx cnx = pool.getConnection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.protocol.ByteBufPair;
Expand Down Expand Up @@ -233,7 +234,7 @@ public void testTamperingMessageIsDetected() throws Exception {
// WHEN
// protocol message is created with checksum
ByteBufPair cmd = Commands.newSend(1, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
OpSendMsg op = OpSendMsg.create((MessageImpl<byte[]>) msgBuilder.getMessage(), cmd, 1, null);
OpSendMsg op = OpSendMsg.create(LatencyHistogram.NOOP, (MessageImpl<byte[]>) msgBuilder.getMessage(), cmd, 1, null);

// THEN
// the checksum validation passes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.protocol.ByteBufPair;
Expand Down Expand Up @@ -499,7 +500,7 @@ public void testExpireIncompleteChunkMessage() throws Exception{
ByteBufPair cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
MessageImpl msgImpl = ((MessageImpl<byte[]>) msg.getMessage());
msgImpl.setSchemaState(SchemaState.Ready);
OpSendMsg op = OpSendMsg.create(msgImpl, cmd, 1, null);
OpSendMsg op = OpSendMsg.create(LatencyHistogram.NOOP, msgImpl, cmd, 1, null);
producer.processOpSendMsg(op);

retryStrategically((test) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -811,7 +812,7 @@ public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned)
private PulsarClient createDelayWatchTopicsClient() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
return InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
(conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
public CompletableFuture<CommandWatchTopicListSuccess> newWatchTopicList(
BaseCommand command, long requestId) {
// Inject 2 seconds delay when sending command New Watch Topics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;

Expand Down Expand Up @@ -79,7 +80,7 @@ public static PulsarTestClient create(ClientBuilder clientBuilder) throws Pulsar
new DefaultThreadFactory("pulsar-test-client-io", Thread.currentThread().isDaemon()));

AtomicReference<Supplier<ClientCnx>> clientCnxSupplierReference = new AtomicReference<>();
ConnectionPool connectionPool = new ConnectionPool(clientConfigurationData, eventLoopGroup,
ConnectionPool connectionPool = new ConnectionPool(InstrumentProvider.NOOP, clientConfigurationData, eventLoopGroup,
() -> clientCnxSupplierReference.get().get());

return new PulsarTestClient(clientConfigurationData, eventLoopGroup, connectionPool,
Expand All @@ -101,7 +102,7 @@ private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup eventLoopG
* @return new ClientCnx instance
*/
protected ClientCnx createClientCnx() {
return new ClientCnx(conf, eventLoopGroup) {
return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
@Override
public int getRemoteEndpointProtocolVersion() {
return overrideRemoteEndpointProtocolVersion != 0
Expand Down
Loading
Loading