Skip to content

Commit

Permalink
Fix MsgDropRate missing from NonPersistentTopics stats output. (apach…
Browse files Browse the repository at this point in the history
…e#11119)

Fixes #apache#10495

### Motivation
MsgDropRate info is missing after NonPersistentTopics admin api merged with Topics admin api. This PR is trying to fix this.

### Modifications
Seems due to API merging, data is not properly deserialized in admin client.
And also due to the added TopicsStats interface, the field hiding causing weird behavior with Jackson so fields in NonPersistentTopicStatsImpl intended to hide superclass' fields are not shown in output.

Fixing by not using same field name to hide superclass fields and use @JsonIgnore to hide them from output. And add new fields to store subscription/publisher/replicator info for NonPersistentTopic.
This does change the output name of those info, but it only changed in cli output, for admin client the old getSubscriptions/getSubscriptions/getReplication will still work.
  • Loading branch information
MarvinCai authored Aug 9, 2021
1 parent 177ea06 commit 0aca5f9
Show file tree
Hide file tree
Showing 12 changed files with 477 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,8 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
try {
stats.add(statFuture.get());
if (perPartition) {
stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
stats.getPartitions().put(topicName.getPartition(i).toString(),
(TopicStatsImpl) statFuture.get());
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
Expand All @@ -1276,7 +1277,7 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
try {
boolean zkPathExists = namespaceResources().getPartitionedTopicResources().exists(path);
if (zkPathExists) {
stats.partitions.put(topicName.toString(), new TopicStatsImpl());
stats.getPartitions().put(topicName.toString(), new TopicStatsImpl());
} else {
asyncResponse.resume(
new RestException(Status.NOT_FOUND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
Expand All @@ -57,6 +58,9 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -202,6 +206,118 @@ public void createPartitionedTopic(
}
}


@GET
@Path("{tenant}/{namespace}/{topic}/partitioned-stats")
@ApiOperation(value = "Get the stats for the partitioned topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void getPartitionedStats(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Get per partition stats")
@QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "If return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
if (topicName.isGlobal()) {
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to get partitioned stats for {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
return;
}
NonPersistentPartitionedTopicStatsImpl stats =
new NonPersistentPartitionedTopicStatsImpl(partitionMetadata);
List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
topicStatsFutureList
.add(pulsar().getAdminClient().topics().getStatsAsync(
(topicName.getPartition(i).toString()), getPreciseBacklog,
subscriptionBacklogSize));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
}

FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
CompletableFuture<TopicStats> statFuture = null;
for (int i = 0; i < topicStatsFutureList.size(); i++) {
statFuture = topicStatsFutureList.get(i);
if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
try {
stats.add((NonPersistentTopicStatsImpl) statFuture.get());
if (perPartition) {
stats.getPartitions().put(topicName.getPartition(i).toString(),
(NonPersistentTopicStatsImpl) statFuture.get());
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return null;
}
}
}
if (perPartition && stats.partitions.isEmpty()) {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
try {
boolean zkPathExists = namespaceResources().getPartitionedTopicResources().exists(path);
if (zkPathExists) {
stats.getPartitions().put(topicName.toString(), new NonPersistentTopicStatsImpl());
} else {
asyncResponse.resume(
new RestException(Status.NOT_FOUND,
"Internal topics have not been generated yet"));
return null;
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return null;
}
}
asyncResponse.resume(stats);
return null;
});
}).exceptionally(ex -> {
log.error("[{}] Failed to get partitioned stats for {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}


@PUT
@Path("/{tenant}/{namespace}/{topic}/unload")
@ApiOperation(value = "Unload a topic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,12 +918,14 @@ public void partitionedTopics(String topicType, String topicName) throws Excepti
fail("getPartitionedTopicMetadata of " + anotherTopic + " should not succeed");
} catch (NotFoundException expected) {
}
// check the getPartitionedStats for PartitionedTopic returns only partitions metadata, and no partitions info

PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(partitionedTopicName,false);

// check the getPartitionedStats for PartitionedTopic returns only partitions metadata, and no partitions info
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
admin.topics().getPartitionedStats(partitionedTopicName,false).getMetadata().partitions);
topicStats.getMetadata().partitions);

assertEquals(admin.topics().getPartitionedStats(partitionedTopicName, false).getPartitions().size(),
0);
assertEquals(topicStats.getPartitions().size(), 0);

List<String> subscriptions = admin.topics().getSubscriptions(partitionedTopicName);
assertEquals(subscriptions.size(), 0);
Expand Down Expand Up @@ -985,7 +987,7 @@ public void partitionedTopics(String topicType, String topicName) throws Excepti
partitionedTopicName + "-partition-2", partitionedTopicName + "-partition-3"));

// test cumulative stats for partitioned topic
PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(partitionedTopicName, false);
topicStats = admin.topics().getPartitionedStats(partitionedTopicName,false);
if (isPersistent) {
// TODO: for non-persistent topics, the subscription doesn't exist
assertEquals(topicStats.getSubscriptions().keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
Expand All @@ -996,7 +998,7 @@ public void partitionedTopics(String topicType, String topicName) throws Excepti
assertEquals(topicStats.getPartitions(), Maps.newHashMap());

// test per partition stats for partitioned topic
topicStats = admin.topics().getPartitionedStats(partitionedTopicName, true);
topicStats = admin.topics().getPartitionedStats(partitionedTopicName,true);
assertEquals(topicStats.getMetadata().partitions, 4);
assertEquals(topicStats.getPartitions().keySet(),
Sets.newHashSet(partitionedTopicName + "-partition-0", partitionedTopicName + "-partition-1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -322,38 +323,37 @@ private void setTopicPoliciesAndValidate(PulsarAdmin admin2
public void nonPersistentTopics() throws Exception {
final String topicName = "nonPersistentTopic";

final String persistentTopicName = "non-persistent://prop-xyz/ns1/" + topicName;
final String nonPersistentTopicName = "non-persistent://prop-xyz/ns1/" + topicName;
// Force to create a topic
publishMessagesOnTopic("non-persistent://prop-xyz/ns1/" + topicName, 0, 0);
publishMessagesOnTopic(nonPersistentTopicName, 0, 0);

// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName("my-sub")
Consumer<byte[]> consumer = client.newConsumer().topic(nonPersistentTopicName).subscriptionName("my-sub")
.subscribe();

publishMessagesOnTopic("non-persistent://prop-xyz/ns1/" + topicName, 10, 0);
publishMessagesOnTopic(nonPersistentTopicName, 10, 0);

TopicStats topicStats = admin.topics().getStats(persistentTopicName);
NonPersistentTopicStats topicStats = (NonPersistentTopicStats) admin.topics().getStats(nonPersistentTopicName);
assertEquals(topicStats.getSubscriptions().keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
assertEquals(topicStats.getSubscriptions().get("my-sub").getConsumers().size(), 1);
assertEquals(topicStats.getSubscriptions().get("my-sub").getMsgDropRate(), 0);
assertEquals(topicStats.getPublishers().size(), 0);
assertEquals(topicStats.getMsgDropRate(), 0);

PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName, false);
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(nonPersistentTopicName, false);
assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));

consumer.close();

topicStats = admin.topics().getStats(persistentTopicName);
topicStats = (NonPersistentTopicStats) admin.topics().getStats(nonPersistentTopicName);
assertTrue(topicStats.getSubscriptions().containsKey("my-sub"));
assertEquals(topicStats.getPublishers().size(), 0);

// test partitioned-topic
final String partitionedTopicName = "non-persistent://prop-xyz/ns1/paritioned";
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 0);
admin.topics().createPartitionedTopic(partitionedTopicName, 5);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 5);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;

import java.util.Map;

/**
* Statistics for a non-persistent partitioned topic.
*/
public interface NonPersistentPartitionedTopicStats extends PartitionedTopicStats{
Map<String, ? extends NonPersistentTopicStats> getPartitions();

NonPersistentTopicStats add(NonPersistentTopicStats ts);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ public interface PartitionedTopicStats extends TopicStats {

PartitionedTopicMetadata getMetadata();

Map<String, TopicStats> getPartitions();
Map<String, ? extends TopicStats> getPartitions();

TopicStats add(TopicStats ts);
}
Loading

0 comments on commit 0aca5f9

Please sign in to comment.