Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] PIP-342: OTel client metrics support #22179

Merged
merged 26 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
babad52
WIP: PIP-342: OTel client metrics support
merlimat Feb 29, 2024
8e184be
Addressed comments
merlimat Mar 4, 2024
4d0120b
Removed MetricsCardinality from API
merlimat Mar 5, 2024
252196e
Fixed tests code
merlimat Mar 5, 2024
4b31eb1
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 5, 2024
445231c
Ensure single-initialization of InstrumentProvider
merlimat Mar 5, 2024
e840d06
Added unit tests
merlimat Mar 6, 2024
6cc759a
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 6, 2024
a0cb78d
Addressed more comments
merlimat Mar 8, 2024
5af2018
Share the histogram objects
merlimat Mar 9, 2024
218f57b
Added javadoc and reset old metrics default
merlimat Mar 11, 2024
bd1fb3f
Added missing license header
merlimat Mar 11, 2024
59bf64b
Removed trailing spaces
merlimat Mar 11, 2024
857c0a3
Fixed checkstyle
merlimat Mar 11, 2024
e103369
Checkstyle
merlimat Mar 11, 2024
1f2d0c8
Checkstyle
merlimat Mar 11, 2024
413360c
Fixed spotbugs
merlimat Mar 11, 2024
345f240
Fixed compilation
merlimat Mar 11, 2024
0b60e7d
Fix
merlimat Mar 11, 2024
70dd347
Fixed test
merlimat Mar 12, 2024
8fe109f
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 13, 2024
d90f621
Fixed tests mock
merlimat Mar 14, 2024
22d46af
Fixed test
merlimat Mar 14, 2024
a9f3857
Attach subscription attribute to consumer metrics
merlimat Mar 14, 2024
980bae3
More test fix
merlimat Mar 14, 2024
545a7d3
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed tests code
  • Loading branch information
merlimat committed Mar 5, 2024
commit 252196e328e5c56da30987859185d44cfefda2a6
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
// Because when invoke `ProducerImpl.processOpSendMsg` on flush,
// if `op.msg != null && isBatchMessagingEnabled()` checks true, it will call `batchMessageAndSend` to flush
// messageContainers before publishing this one-batch message.
op = OpSendMsg.create(producer, messages, cmd, messageMetadata.getSequenceId(), firstCallback,
batchAllocatedSizeBytes);
op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
firstCallback, batchAllocatedSizeBytes);

// NumMessagesInBatch and BatchSizeByte will not be serialized to the binary cmd. It's just useful for the
// ProducerStats
Expand Down Expand Up @@ -314,7 +314,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes());
}

OpSendMsg op = OpSendMsg.create(producer, messages, cmd, messageMetadata.getSequenceId(),
OpSendMsg op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback, batchAllocatedSizeBytes);

op.setNumMessagesInBatch(numMessagesInBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.ScheduledFuture;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -179,7 +178,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private boolean errorState;

private final LatencyHistogram latencyHistogram;
private final LatencyHistogram rpcLatencyHistogram;
final LatencyHistogram rpcLatencyHistogram;
private final Counter publishedBytesCounter;
private final UpDownCounter pendingMessagesCounter;
private final UpDownCounter pendingBytesCounter;
Expand Down Expand Up @@ -745,9 +744,9 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) {
ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, messageId, msgMetadata,
encryptedPayload);
op = OpSendMsg.create(this, msg, cmd, sequenceId, callback);
op = OpSendMsg.create(rpcLatencyHistogram, msg, cmd, sequenceId, callback);
} else {
op = OpSendMsg.create(this, msg, null, sequenceId, callback);
op = OpSendMsg.create(rpcLatencyHistogram, msg, null, sequenceId, callback);
final MessageMetadata finalMsgMetadata = msgMetadata;
op.rePopulate = () -> {
if (msgMetadata.hasChunkId()) {
Expand Down Expand Up @@ -1455,7 +1454,7 @@ public ReferenceCounted touch(Object hint) {
}

protected static final class OpSendMsg {
ProducerImpl<?> producer;
LatencyHistogram rpcLatencyHistogram;
MessageImpl<?> msg;
List<MessageImpl<?>> msgs;
ByteBufPair cmd;
Expand All @@ -1475,7 +1474,7 @@ protected static final class OpSendMsg {
int chunkId = -1;

void initialize() {
producer = null;
rpcLatencyHistogram = null;
msg = null;
msgs = null;
cmd = null;
Expand All @@ -1495,11 +1494,11 @@ void initialize() {
chunkedMessageCtx = null;
}

static OpSendMsg create(ProducerImpl<?> producer, MessageImpl<?> msg, ByteBufPair cmd, long sequenceId,
static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, MessageImpl<?> msg, ByteBufPair cmd, long sequenceId,
SendCallback callback) {
OpSendMsg op = RECYCLER.get();
op.initialize();
op.producer = producer;
op.rpcLatencyHistogram = rpcLatencyHistogram;
op.msg = msg;
op.cmd = cmd;
op.callback = callback;
Expand All @@ -1509,12 +1508,12 @@ static OpSendMsg create(ProducerImpl<?> producer, MessageImpl<?> msg, ByteBufPai
return op;
}

static OpSendMsg create(ProducerImpl<?> producer, List<MessageImpl<?>> msgs, ByteBufPair cmd, long sequenceId,
static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List<MessageImpl<?>> msgs, ByteBufPair cmd, long sequenceId,
SendCallback callback,
int batchAllocatedSize) {
OpSendMsg op = RECYCLER.get();
op.initialize();
op.producer = producer;
op.rpcLatencyHistogram = rpcLatencyHistogram;
op.msgs = msgs;
op.cmd = cmd;
op.callback = callback;
Expand All @@ -1528,12 +1527,12 @@ static OpSendMsg create(ProducerImpl<?> producer, List<MessageImpl<?>> msgs, Byt
return op;
}

static OpSendMsg create(ProducerImpl<?> producer, List<MessageImpl<?>> msgs, ByteBufPair cmd,
static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List<MessageImpl<?>> msgs, ByteBufPair cmd,
long lowestSequenceId,
long highestSequenceId, SendCallback callback, int batchAllocatedSize) {
OpSendMsg op = RECYCLER.get();
op.initialize();
op.producer = producer;
op.rpcLatencyHistogram = rpcLatencyHistogram;
op.msgs = msgs;
op.cmd = cmd;
op.callback = callback;
Expand Down Expand Up @@ -1585,9 +1584,9 @@ void sendComplete(final Exception e) {
}

if (e == null) {
producer.rpcLatencyHistogram.recordSuccess(now - this.lastSentAt);
rpcLatencyHistogram.recordSuccess(now - this.lastSentAt);
} else {
producer.rpcLatencyHistogram.recordFailure(now - this.lastSentAt);
rpcLatencyHistogram.recordFailure(now - this.lastSentAt);
}

callback.sendComplete(finalEx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,20 @@

public class LatencyHistogram {

// Used for tests
public final static LatencyHistogram NOOP = new LatencyHistogram(null, null, null, null, null) {
public void recordSuccess(long latencyNanos) {
}

public void recordFailure(long latencyNanos) {
}
};

private static final List<Double> latencyHistogramBuckets =
Lists.newArrayList(.0005, .001, .0025, .005, .01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0);

private static final double NANOS = TimeUnit.SECONDS.toNanos(1);


private final Attributes successAttributes;

private final Attributes failedAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Lists;
import java.util.Arrays;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand All @@ -39,7 +40,7 @@ public void createMockMessage() {
}

private ProducerImpl.OpSendMsg createDummyOpSendMsg() {
return ProducerImpl.OpSendMsg.create(message, null, 0L, null);
return ProducerImpl.OpSendMsg.create(LatencyHistogram.NOOP, message, null, 0L, null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.mockito.Mockito;
import org.testng.annotations.Test;
Expand All @@ -42,6 +43,7 @@ public void testChunkedMessageCtxDeallocate() {
for (int i = 0; i < totalChunks; i++) {
ProducerImpl.OpSendMsg opSendMsg =
ProducerImpl.OpSendMsg.create(
LatencyHistogram.NOOP,
MessageImpl.create(new MessageMetadata(), ByteBuffer.allocate(0), Schema.STRING, null),
null, 0, null);
opSendMsg.chunkedMessageCtx = ctx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
Expand Down Expand Up @@ -180,7 +181,7 @@ public void testInitializeWithTimer() throws PulsarClientException {
ClientConfigurationData conf = new ClientConfigurationData();
@Cleanup("shutdownGracefully")
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
ConnectionPool pool = Mockito.spy(new ConnectionPool(new InstrumentProvider(conf), conf, eventLoop));
conf.setServiceUrl("pulsar://localhost:6650");

HashedWheelTimer timer = new HashedWheelTimer();
Expand All @@ -205,7 +206,7 @@ public void testResourceCleanup() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("");
initializeEventLoopGroup(conf);
try (ConnectionPool connectionPool = new ConnectionPool(conf, eventLoopGroup)) {
try (ConnectionPool connectionPool = new ConnectionPool(new InstrumentProvider(conf), conf, eventLoopGroup)) {
assertThrows(() -> new PulsarClientImpl(conf, eventLoopGroup, connectionPool));
} finally {
// Externally passed eventLoopGroup should not be shutdown.
Expand Down
Loading