When a topic has a large number of producers or consumers (over 1k), querying the pulsarAdmin.topics().getPartitionedStats()
interface is slow and the response size is also large.
As a result, it's essential to give users the option of querying producer and consumer information.
Add the API for org.apache.pulsar.client.admin.Topics
CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
String topic, boolean perPartition, GetStatsOptions getStatsOptions);
CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions getStatsOptions);
None.
Implement the getPartitionedStatsAsync
method, and add the excludePublishers
and excludeConsumers
parameters to {tenant}/{namespace}/{topic}/partitioned-stats
API in PersistentTopics
and NonPersistentTopics
.
Add two fields for org.apache.pulsar.client.admin.GetStatsOptions
@Data
@Builder
public class GetStatsOptions {
/**
* Whether to exclude publishers.
*/
private final boolean excludePublishers;
/**
* Whether to exclude consumers.
*/
private final boolean excludeConsumers;
}
Implement the getPartitionedStatsAsync
and getStatsAsync
interface for org.apache.pulsar.client.admin.internal.TopicsImpl
@Override
public CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic, boolean perPartition, GetStatsOptions getStatsOptions){
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitioned-stats");
path = path.queryParam("perPartition", perPartition)
.queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog())
.queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize())
.queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog());
.queryParam("excludePublishers", getStatsOptions.isExcludePublishers())
.queryParam("excludeConsumers", getStatsOptions.isExcludeConsumers());
}
@Override
public CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions getStatsOptions){
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "stats")
.queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog())
.queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize())
.queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog());
.queryParam("excludePublishers",getStatsOptions.isExcludePublishers())
.queryParam("excludeConsumers",getStatsOptions.isExcludeConsumers());
}
Add the excludePublishers
and excludeConsumers
parameters to {tenant}/{namespace}/{topic}/partitioned-stats
API
@GET
@Path("{tenant}/{namespace}/{topic}/partitioned-stats")
public void getPartitionedStats(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Get per partition stats")
@QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "If return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If exclude the publishers")
@QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
@ApiParam(value = "If exclude the consumers")
@QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers)
Add the excludePublishers
and excludeConsumers
parameters to {tenant}/{namespace}/{topic}/stats
API
@GET
@Path("{tenant}/{namespace}/{topic}/stats")
public void getStats(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "If return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
@ApiParam(value = "If return time of the earliest message in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If exclude the publishers"),
@QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
@ApiParam(value = "If exclude the consumers")
@QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers)
Add a new method for org.apache.pulsar.broker.service.Topic
CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions);
@Data
@Builder
public class GetStatsOptions {
/**
* Set to true to get precise backlog, Otherwise get imprecise backlog.
*/
private final boolean getPreciseBacklog;
/**
* Whether to get backlog size for each subscription.
*/
private final boolean subscriptionBacklogSize;
/**
* Whether to get the earliest time in backlog.
*/
private final boolean getEarliestTimeInBacklog;
/**
* Whether to exclude publishers.
*/
private final boolean excludePublishers;
/**
* Whether to exclude consumers.
*/
private final boolean excludeConsumers;
}
Add the following logic in org.apache.pulsar.broker.service.persistent.PersistentTopic.asyncGetStats
and org.apache.pulsar.broker.service.persistent.PersistentSubscription.getStats
:
if (!excludePublishers){
stats.addPublisher(publisherStats);
}
if (!excludeConsumers){
subStats.consumers.add(consumerStats);
}
- Mailing List discussion thread: https://lists.apache.org/thread/c92043zq6lyrsd5z1hnln48mx858n7vj
- Mailing List voting thread: https://lists.apache.org/thread/hjw3y7h5vd0x7st6zslj3btjcd6yf1lx