diff --git a/conf/broker.conf b/conf/broker.conf index 2b0365c4312c8..6f502a59c6ee8 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -452,3 +452,8 @@ exposeTopicLevelMetricsInPrometheus=true # Enable Functions Worker Service in Broker functionsWorkerEnabled=false + +### --- Broker Web Stats --- ### + +# Enable topic level metrics +exposePublisherStats=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 69662877fb044..2beddf50da09a 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -391,3 +391,8 @@ webSocketConnectionsPerBroker=8 # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true + +### --- Broker Web Stats --- ### + +# Enable topic level metrics +exposePublisherStats=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index b35b440ccb055..e6c37ba162420 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -455,6 +455,10 @@ public class ServiceConfiguration implements PulsarConfiguration { /**** --- Functions --- ****/ private boolean functionsWorkerEnabled = false; + /**** --- Broker Web Stats --- ****/ + // If true, export publisher stats when returning topics stats from the admin rest api + private boolean exposePublisherStats = true; + public String getZookeeperServers() { return zookeeperServers; } @@ -1558,6 +1562,17 @@ public boolean isFunctionsWorkerEnabled() { return functionsWorkerEnabled; } + + /**** --- Broker Web Stats ---- ****/ + + public void setExposePublisherStats(boolean expose) { + this.exposePublisherStats = expose; + } + + public boolean exposePublisherStats() { + return exposePublisherStats; + } + public boolean isRunningStandalone() { return isRunningStandalone; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java index 01476929e8b8d..5fd57839f9df2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java @@ -30,6 +30,8 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.StreamingOutput; +import java.io.OutputStream; import java.util.Collection; import java.util.Map; @@ -38,6 +40,21 @@ @Produces(MediaType.APPLICATION_JSON) public class BrokerStats extends BrokerStatsBase { + @GET + @Path("/topics") + @ApiOperation( + value = "Get all the topic stats by namesapce", + response = OutputStream.class, + responseContainer = "OutputStream") + // https://github.com/swagger-api/swagger-ui/issues/558 + // map + // support + // missing + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) + public StreamingOutput getTopics2() throws Exception { + return super.getTopics2(); + } + @GET @Path("/broker-resource-availability/{tenant}/{namespace}") @ApiOperation(value = "Broker availability report", notes = "This API gives the current broker availability in " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index c04c7a8f5c5f1..9fd02ae2f940a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -58,6 +58,7 @@ public class PulsarStats implements Closeable { private List metricsCollection; private List tempNonPersistentTopics; private final BrokerOperabilityMetrics brokerOperabilityMetrics; + private final boolean exposePublisherStats; private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock(); @@ -74,6 +75,8 @@ public PulsarStats(PulsarService pulsar) { this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(), pulsar.getAdvertisedAddress()); this.tempNonPersistentTopics = Lists.newArrayList(); + + this.exposePublisherStats = pulsar.getConfiguration().exposePublisherStats(); } @Override @@ -129,7 +132,7 @@ public synchronized void updateStats( if (topic instanceof PersistentTopic) { try { topic.updateRates(nsStats, currentBundleStats, topicStatsStream, - clusterReplicationMetrics, namespaceName); + clusterReplicationMetrics, namespaceName, exposePublisherStats); } catch (Exception e) { log.error("Failed to generate topic stats for topic {}: {}", name, e.getMessage(), e); } @@ -151,7 +154,7 @@ public synchronized void updateStats( tempNonPersistentTopics.forEach(topic -> { try { topic.updateRates(nsStats, currentBundleStats, topicStatsStream, - clusterReplicationMetrics, namespaceName); + clusterReplicationMetrics, namespaceName, exposePublisherStats); } catch (Exception e) { log.error("Failed to generate topic stats for topic {}: {}", topic.getName(), e.getMessage(), e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java new file mode 100644 index 0000000000000..a5614e8a4c59c --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.PublisherStats; +import org.apache.pulsar.utils.StatsOutputStream; + +public class StreamingStats { + private StreamingStats() {} + + public static void writePublisherStats(StatsOutputStream statsStream, PublisherStats stats) { + statsStream.startObject(); + + statsStream.writePair("msgRateIn", stats.msgRateIn); + statsStream.writePair("msgThroughputIn", stats.msgThroughputIn); + statsStream.writePair("averageMsgSize", stats.averageMsgSize); + + statsStream.writePair("address", stats.getAddress()); + statsStream.writePair("producerId", stats.producerId); + statsStream.writePair("producerName", stats.getProducerName()); + statsStream.writePair("connectedSince", stats.getConnectedSince()); + if (stats.getClientVersion() != null) { + statsStream.writePair("clientVersion", stats.getClientVersion()); + } + + // add metadata + statsStream.startObject("metadata"); + if (stats.metadata != null && !stats.metadata.isEmpty()) { + stats.metadata.forEach(statsStream::writePair); + } + statsStream.endObject(); + + statsStream.endObject(); + } + + + public static void writeConsumerStats(StatsOutputStream statsStream, PulsarApi.CommandSubscribe.SubType subType, + ConsumerStats stats) { + // Populate consumer specific stats here + statsStream.startObject(); + + statsStream.writePair("address", stats.getAddress()); + statsStream.writePair("consumerName", stats.consumerName); + statsStream.writePair("availablePermits", stats.availablePermits); + statsStream.writePair("connectedSince", stats.getConnectedSince()); + statsStream.writePair("msgRateOut", stats.msgRateOut); + statsStream.writePair("msgThroughputOut", stats.msgThroughputOut); + statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver); + + if (PulsarApi.CommandSubscribe.SubType.Shared.equals(subType)) { + statsStream.writePair("unackedMessages", stats.unackedMessages); + statsStream.writePair("blockedConsumerOnUnackedMsgs", stats.blockedConsumerOnUnackedMsgs); + } + if (stats.getClientVersion() != null) { + statsStream.writePair("clientVersion", stats.getClientVersion()); + } + + // add metadata + statsStream.startObject("metadata"); + if (stats.metadata != null && !stats.metadata.isEmpty()) { + stats.metadata.forEach(statsStream::writePair); + } + statsStream.endObject(); + + statsStream.endObject(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index fceb858a1af43..fbf3c65de3fc1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -40,7 +40,7 @@ public interface Topic { - public interface PublishContext { + interface PublishContext { default String getProducerName() { return null; @@ -117,7 +117,7 @@ CompletableFuture subscribe(ServerCnx cnx, String subscriptionName, lo void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats, StatsOutputStream topicStatsStream, ClusterReplicationMetrics clusterReplicationMetrics, - String namespaceName); + String namespaceName, boolean hydratePublishers); Subscription getSubscription(String subscription); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 790197597b98c..e45016c309216 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -61,6 +61,7 @@ import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.StreamingStats; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; @@ -681,7 +682,7 @@ public String getName() { } public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream, - ClusterReplicationMetrics replStats, String namespace) { + ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) { TopicStats topicStats = threadLocalTopicStats.get(); topicStats.reset(); @@ -692,22 +693,25 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats bundleStats.producerCount += producers.size(); topicStatsStream.startObject(topic); + topicStatsStream.startList("publishers"); producers.forEach(producer -> { producer.updateRates(); - PublisherStats PublisherStats = producer.getStats(); + PublisherStats publisherStats = producer.getStats(); - topicStats.aggMsgRateIn += PublisherStats.msgRateIn; - topicStats.aggMsgThroughputIn += PublisherStats.msgThroughputIn; + topicStats.aggMsgRateIn += publisherStats.msgRateIn; + topicStats.aggMsgThroughputIn += publisherStats.msgThroughputIn; if (producer.isRemote()) { - topicStats.remotePublishersStats.put(producer.getRemoteCluster(), PublisherStats); + topicStats.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats); } - }); - // Creating publishers object for backward compatibility - topicStatsStream.startList("publishers"); + if (hydratePublishers) { + StreamingStats.writePublisherStats(topicStatsStream, publisherStats); + } + }); topicStatsStream.endList(); + // Start replicator stats topicStatsStream.startObject("replication"); nsStats.replicatorCount += topicStats.remotePublishersStats.size(); @@ -744,24 +748,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats subMsgRateRedeliver += consumerStats.msgRateRedeliver; // Populate consumer specific stats here - topicStatsStream.startObject(); - topicStatsStream.writePair("address", consumerStats.getAddress()); - topicStatsStream.writePair("consumerName", consumerStats.consumerName); - topicStatsStream.writePair("availablePermits", consumerStats.availablePermits); - topicStatsStream.writePair("connectedSince", consumerStats.getConnectedSince()); - topicStatsStream.writePair("msgRateOut", consumerStats.msgRateOut); - topicStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut); - topicStatsStream.writePair("msgRateRedeliver", consumerStats.msgRateRedeliver); - - if (SubType.Shared.equals(subscription.getType())) { - topicStatsStream.writePair("unackedMessages", consumerStats.unackedMessages); - topicStatsStream.writePair("blockedConsumerOnUnackedMsgs", - consumerStats.blockedConsumerOnUnackedMsgs); - } - if (consumerStats.getClientVersion() != null) { - topicStatsStream.writePair("clientVersion", consumerStats.getClientVersion()); - } - topicStatsStream.endObject(); + StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats); } // Close Consumer stats diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0240ffdf2877d..9c59bd983984d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -75,6 +75,7 @@ import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.StreamingStats; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; @@ -1095,7 +1096,7 @@ public ManagedLedger getManagedLedger() { } public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream, - ClusterReplicationMetrics replStats, String namespace) { + ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) { TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get(); topicStatsHelper.reset(); @@ -1106,6 +1107,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats bundleStats.producerCount += producers.size(); topicStatsStream.startObject(topic); + // start publisher stats + topicStatsStream.startList("publishers"); producers.forEach(producer -> { producer.updateRates(); PublisherStats publisherStats = producer.getStats(); @@ -1116,10 +1119,12 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats if (producer.isRemote()) { topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats); } - }); - // Creating publishers object for backward compatibility - topicStatsStream.startList("publishers"); + // Populate consumer specific stats here + if (hydratePublishers) { + StreamingStats.writePublisherStats(topicStatsStream, publisherStats); + } + }); topicStatsStream.endList(); // Start replicator stats @@ -1215,25 +1220,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats subMsgThroughputOut += consumerStats.msgThroughputOut; subMsgRateRedeliver += consumerStats.msgRateRedeliver; - // Populate consumer specific stats here - topicStatsStream.startObject(); - topicStatsStream.writePair("address", consumerStats.getAddress()); - topicStatsStream.writePair("consumerName", consumerStats.consumerName); - topicStatsStream.writePair("availablePermits", consumerStats.availablePermits); - topicStatsStream.writePair("connectedSince", consumerStats.getConnectedSince()); - topicStatsStream.writePair("msgRateOut", consumerStats.msgRateOut); - topicStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut); - topicStatsStream.writePair("msgRateRedeliver", consumerStats.msgRateRedeliver); - - if (SubType.Shared.equals(subscription.getType())) { - topicStatsStream.writePair("unackedMessages", consumerStats.unackedMessages); - topicStatsStream.writePair("blockedConsumerOnUnackedMsgs", - consumerStats.blockedConsumerOnUnackedMsgs); - } - if (consumerStats.getClientVersion() != null) { - topicStatsStream.writePair("clientVersion", consumerStats.getClientVersion()); - } - topicStatsStream.endObject(); + StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats); } // Close Consumer stats diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java index 86c0317b80cd6..257adb4238b94 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java @@ -80,7 +80,7 @@ public JsonArray getMBeans() throws PulsarAdminException { @Override public JsonObject getTopics() throws PulsarAdminException { try { - String json = request(adminV2BrokerStats.path("/destinations")).get(String.class); + String json = request(adminV2BrokerStats.path("/topics")).get(String.class); return new Gson().fromJson(json, JsonObject.class); } catch (Exception e) { throw getApiException(e);