Skip to content

Latest commit



224 lines (166 loc) · 7.84 KB

File metadata and controls

224 lines (166 loc) · 7.84 KB


When a topic has a large number of producers or consumers (over 1k), querying the pulsarAdmin.topics().getPartitionedStats() interface is slow and the response size is also large. As a result, it's essential to give users the option of querying producer and consumer information.


In Scope

Add the API for org.apache.pulsar.client.admin.Topics

CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
        String topic, boolean perPartition, GetStatsOptions getStatsOptions);

CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions getStatsOptions);

Out of Scope


High Level Design

Implement the getPartitionedStatsAsync method, and add the excludePublishers and excludeConsumers parameters to {tenant}/{namespace}/{topic}/partitioned-stats API in PersistentTopics and NonPersistentTopics.

Detailed Design

Design & Implementation Details

Add two fields for org.apache.pulsar.client.admin.GetStatsOptions

public class GetStatsOptions {
     * Whether to exclude publishers.
    private final boolean excludePublishers;

     * Whether to exclude consumers.
    private final boolean excludeConsumers;

Implement the getPartitionedStatsAsync and getStatsAsync interface for org.apache.pulsar.client.admin.internal.TopicsImpl

public CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic, boolean perPartition, GetStatsOptions getStatsOptions){
    TopicName tn = validateTopic(topic);
    WebTarget path = topicPath(tn, "partitioned-stats");
    path = path.queryParam("perPartition", perPartition)
            .queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog())
            .queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize())
            .queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog());
            .queryParam("excludePublishers", getStatsOptions.isExcludePublishers())
            .queryParam("excludeConsumers", getStatsOptions.isExcludeConsumers());

public CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions getStatsOptions){
    TopicName tn = validateTopic(topic);
    WebTarget path = topicPath(tn, "stats")
            .queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog())
            .queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize())
            .queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog());

Add the excludePublishers and excludeConsumers parameters to {tenant}/{namespace}/{topic}/partitioned-stats API

public void getPartitionedStats(
        @Suspended final AsyncResponse asyncResponse,
        @ApiParam(value = "Specify the tenant", required = true)
        @PathParam("tenant") String tenant,
        @ApiParam(value = "Specify the namespace", required = true)
        @PathParam("namespace") String namespace,
        @ApiParam(value = "Specify topic name", required = true)
        @PathParam("topic") @Encoded String encodedTopic,
        @ApiParam(value = "Get per partition stats")
        @QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
        @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
        @ApiParam(value = "If return precise backlog or imprecise backlog")
        @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
        @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
                + "not to use when there's heavy traffic.")
        @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
        @ApiParam(value = "If return the earliest time in backlog")
        @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
        @ApiParam(value = "If exclude the publishers")
        @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
        @ApiParam(value = "If exclude the consumers")
        @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers)

Add the excludePublishers and excludeConsumers parameters to {tenant}/{namespace}/{topic}/stats API

public void getStats(
        @Suspended final AsyncResponse asyncResponse,
        @ApiParam(value = "Specify the tenant", required = true)
        @PathParam("tenant") String tenant,
        @ApiParam(value = "Specify the namespace", required = true)
        @PathParam("namespace") String namespace,
        @ApiParam(value = "Specify topic name", required = true)
        @PathParam("topic") @Encoded String encodedTopic,
        @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
        @ApiParam(value = "If return precise backlog or imprecise backlog")
        @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
        @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
                + "not to use when there's heavy traffic.")
        @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
        @ApiParam(value = "If return time of the earliest message in backlog")
        @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
        @ApiParam(value = "If exclude the publishers"),
        @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
        @ApiParam(value = "If exclude the consumers")
        @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers)

Add a new method for

CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions);

public class GetStatsOptions {
    * Set to true to get precise backlog, Otherwise get imprecise backlog.
    private final boolean getPreciseBacklog;

     * Whether to get backlog size for each subscription.
    private final boolean subscriptionBacklogSize;

     * Whether to get the earliest time in backlog.
    private final boolean getEarliestTimeInBacklog;

     * Whether to exclude publishers.
    private final boolean excludePublishers;

     * Whether to exclude consumers.
    private final boolean excludeConsumers;


Add the following logic in and

    if (!excludePublishers){
    if (!excludeConsumers){

Public-facing Changes

Public API

Binary protocol





Security Considerations

Backward & Forward Compatibility




General Notes
