Skip to content

Commit

Permalink
[feat][broker] PIP-264: Add OpenTelemetry broker replicator metrics (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor authored Jun 25, 2024
1 parent 7dba98b commit f323342
Show file tree
Hide file tree
Showing 22 changed files with 539 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
Expand Down Expand Up @@ -260,6 +261,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private OpenTelemetryTopicStats openTelemetryTopicStats;
private OpenTelemetryConsumerStats openTelemetryConsumerStats;
private OpenTelemetryProducerStats openTelemetryProducerStats;
private OpenTelemetryReplicatorStats openTelemetryReplicatorStats;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -678,6 +680,10 @@ public CompletableFuture<Void> closeAsync() {
brokerClientSharedTimer.stop();
monotonicSnapshotClock.close();

if (openTelemetryReplicatorStats != null) {
openTelemetryReplicatorStats.close();
openTelemetryReplicatorStats = null;
}
if (openTelemetryProducerStats != null) {
openTelemetryProducerStats.close();
openTelemetryProducerStats = null;
Expand Down Expand Up @@ -834,6 +840,7 @@ public void start() throws PulsarServerException {
openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
openTelemetryProducerStats = new OpenTelemetryProducerStats(this);
openTelemetryReplicatorStats = new OpenTelemetryReplicatorStats(this);

localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.common.Attributes;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +43,7 @@
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.StringInterner;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,6 +57,7 @@ public abstract class AbstractReplicator implements Replicator {
protected final PulsarClientImpl replicationClient;
protected final PulsarClientImpl client;
protected String replicatorId;
@Getter
protected final Topic localTopic;

protected volatile ProducerImpl producer;
Expand All @@ -74,6 +77,10 @@ public abstract class AbstractReplicator implements Replicator {
@Getter
protected volatile State state = State.Disconnected;

private volatile Attributes attributes = null;
private static final AtomicReferenceFieldUpdater<AbstractReplicator, Attributes> ATTRIBUTES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, Attributes.class, "attributes");

public enum State {
/**
* This enum has two mean meanings:
Expand Down Expand Up @@ -136,6 +143,17 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl

protected abstract void disableReplicatorRead();

@Override
public boolean isConnected() {
var producer = this.producer;
return producer != null && producer.isConnected();
}

public long getReplicationDelayMs() {
var producer = this.producer;
return producer == null ? 0 : producer.getDelayInMillis();
}

public String getRemoteCluster() {
return remoteCluster;
}
Expand Down Expand Up @@ -476,4 +494,26 @@ protected ImmutablePair<Boolean, State> compareSetAndGetState(State expect, Stat
public boolean isTerminated() {
return state == State.Terminating || state == State.Terminated;
}

public Attributes getAttributes() {
if (attributes != null) {
return attributes;
}
return ATTRIBUTES_UPDATER.updateAndGet(this, old -> {
if (old != null) {
return old;
}
var topicName = TopicName.get(getLocalTopic().getName());
var builder = Attributes.builder()
.put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString())
.put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant())
.put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace())
.put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName());
if (topicName.isPartitioned()) {
builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex());
}
builder.put(OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, getRemoteCluster());
return builder.build();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,14 @@ public void incrementPublishCount(Producer producer, int numOfMessages, long msg
if (isSystemTopic()) {
systemTopicBytesInCounter.add(msgSizeInBytes);
}

if (producer.isRemote()) {
var remoteClusterName = producer.getRemoteCluster();
var replicator = getReplicators().get(remoteClusterName);
if (replicator != null) {
replicator.getStats().incrementPublishCount(numOfMessages, msgSizeInBytes);
}
}
}

private void handlePublishThrottling(Producer producer, int numOfMessages, long msgSizeInBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public interface Replicator {

void startProducer();

ReplicatorStatsImpl getStats();
Topic getLocalTopic();

ReplicatorStatsImpl computeStats();

CompletableFuture<Void> terminate();

Expand All @@ -53,4 +55,6 @@ default Optional<DispatchRateLimiter> getRateLimiter() {
long getNumberOfEntriesInBacklog();

boolean isTerminated();

ReplicatorStatsImpl getStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public void sendMessage(Entry entry) {
}

msgOut.recordEvent(headersAndPayload.readableBytes());
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());

msg.setReplicatedFrom(localCluster);

Expand All @@ -129,6 +131,7 @@ public void sendMessage(Entry entry) {
replicatorId);
}
msgDrop.recordEvent();
stats.incrementMsgDropCount();
entry.release();
}
}
Expand All @@ -143,11 +146,11 @@ public void updateRates() {
}

@Override
public NonPersistentReplicatorStatsImpl getStats() {
stats.connected = producer != null && producer.isConnected();
stats.replicationDelayInSeconds = getReplicationDelayInSeconds();

public NonPersistentReplicatorStatsImpl computeStats() {
ProducerImpl producer = this.producer;
stats.connected = isConnected();
stats.replicationDelayInSeconds = TimeUnit.MILLISECONDS.toSeconds(getReplicationDelayMs());

if (producer != null) {
stats.outboundConnection = producer.getConnectionId();
stats.outboundConnectedSince = producer.getConnectedSince();
Expand All @@ -159,11 +162,9 @@ public NonPersistentReplicatorStatsImpl getStats() {
return stats;
}

private long getReplicationDelayInSeconds() {
if (producer != null) {
return TimeUnit.MILLISECONDS.toSeconds(producer.getDelayInMillis());
}
return 0L;
@Override
public NonPersistentReplicatorStatsImpl getStats() {
return stats;
}

private static final class ProducerSendCallback implements SendCallback {
Expand Down Expand Up @@ -256,10 +257,4 @@ public long getNumberOfEntriesInBacklog() {
protected void disableReplicatorRead() {
// No-op
}

@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
return producer != null && producer.isConnected();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
});

replicators.forEach((cluster, replicator) -> {
NonPersistentReplicatorStatsImpl replicatorStats = replicator.getStats();
NonPersistentReplicatorStatsImpl replicatorStats = replicator.computeStats();

// Add incoming msg rates
PublisherStatsImpl pubStats = remotePublishersStats.get(replicator.getRemoteCluster());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ protected boolean replicateEntries(List<Entry> entries) {
msg.getMessageBuilder().clearTxnidMostBits();
msg.getMessageBuilder().clearTxnidLeastBits();
msgOut.recordEvent(headersAndPayload.readableBytes());
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import lombok.Getter;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down Expand Up @@ -107,7 +109,8 @@ public abstract class PersistentReplicator extends AbstractReplicator
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;

private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
@Getter
protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();

protected volatile boolean fetchSchemaInProgress = false;

Expand All @@ -118,7 +121,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
super(localCluster, localTopic, remoteCluster, remoteTopic, localTopic.getReplicatorPrefix(),
brokerService, replicationClient);
this.topic = localTopic;
this.cursor = cursor;
this.cursor = Objects.requireNonNull(cursor);
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
Codec.decode(cursor.getName()), cursor, null);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
Expand Down Expand Up @@ -186,12 +189,14 @@ public long getNumberOfEntriesInBacklog() {
return cursor.getNumberOfEntriesInBacklog(true);
}

public long getMessageExpiredCount() {
return expiryMonitor.getTotalMessageExpired();
}

@Override
protected void disableReplicatorRead() {
if (this.cursor != null) {
// deactivate cursor after successfully close the producer
this.cursor.setInactive();
}
// deactivate cursor after successfully close the producer
this.cursor.setInactive();
}

/**
Expand Down Expand Up @@ -330,12 +335,10 @@ protected CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws Ex
}

public void updateCursorState() {
if (this.cursor != null) {
if (producer != null && producer.isConnected()) {
this.cursor.setActive();
} else {
this.cursor.setInactive();
}
if (isConnected()) {
cursor.setActive();
} else {
cursor.setInactive();
}
}

Expand Down Expand Up @@ -595,10 +598,10 @@ public void updateRates() {
stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
}

public ReplicatorStatsImpl getStats() {
stats.replicationBacklog = cursor != null ? cursor.getNumberOfEntriesInBacklog(false) : 0;
stats.connected = producer != null && producer.isConnected();
stats.replicationDelayInSeconds = getReplicationDelayInSeconds();
public ReplicatorStatsImpl computeStats() {
stats.replicationBacklog = cursor.getNumberOfEntriesInBacklog(false);
stats.connected = isConnected();
stats.replicationDelayInSeconds = TimeUnit.MILLISECONDS.toSeconds(getReplicationDelayMs());

ProducerImpl producer = this.producer;
if (producer != null) {
Expand All @@ -616,13 +619,6 @@ public void updateMessageTTL(int messageTTLInSeconds) {
this.messageTTLInSeconds = messageTTLInSeconds;
}

private long getReplicationDelayInSeconds() {
if (producer != null) {
return TimeUnit.MILLISECONDS.toSeconds(producer.getDelayInMillis());
}
return 0L;
}

@Override
public boolean expireMessages(int messageTTLInSeconds) {
if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
Expand Down Expand Up @@ -691,12 +687,6 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl<
}
}

@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
return producer != null && producer.isConnected();
}

@Override
protected void doReleaseResources() {
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2353,7 +2353,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
}

// Update replicator stats
ReplicatorStatsImpl rStat = replicator.getStats();
ReplicatorStatsImpl rStat = replicator.computeStats();

// Add incoming msg rates
PublisherStatsImpl pubStats = topicStatsHelper.remotePublishersStats.get(replicator.getRemoteCluster());
Expand Down Expand Up @@ -2636,7 +2636,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
});

replicators.forEach((cluster, replicator) -> {
ReplicatorStatsImpl replicatorStats = replicator.getStats();
ReplicatorStatsImpl replicatorStats = replicator.computeStats();

// Add incoming msg rates
PublisherStatsImpl pubStats = remotePublishersStats.get(replicator.getRemoteCluster());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ protected boolean replicateEntries(List<Entry> entries) {
dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.consumeDispatchQuota(1, entry.getLength()));

msgOut.recordEvent(headersAndPayload.readableBytes());
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());

msg.setReplicatedFrom(localCluster);

Expand Down
Loading

0 comments on commit f323342

Please sign in to comment.