Skip to content

Commit

Permalink
In pulsar admin Java API, unified persistent vs non-persistent into "…
Browse files Browse the repository at this point in the history
…topics()" (apache#1634)

* In pulsar admin Java API, unified persistent vs non-persistent into "topics()"

* Fixed tests

* Merged and fixed tests

* Couple more tests fixes

* Consolidated pulsar-admin persistent|non-persistent into 'topics' command
  • Loading branch information
merlimat authored Apr 27, 2018
1 parent 7484664 commit 02eff26
Show file tree
Hide file tree
Showing 71 changed files with 2,575 additions and 1,698 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -404,7 +404,7 @@ protected void internalDeletePartitionedTopic(boolean authoritative) {
try {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
pulsar().getAdminClient().persistentTopics().deleteAsync(topicNamePartition.toString())
pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString())
.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof NotFoundException) {
Expand Down Expand Up @@ -503,7 +503,7 @@ protected List<String> internalGetSubscriptions(boolean authoritative) {
try {
// get the subscriptions only from the 1st partition since all the other partitions will have the same
// subscriptions
subscriptions.addAll(pulsar().getAdminClient().persistentTopics()
subscriptions.addAll(pulsar().getAdminClient().topics()
.getSubscriptions(topicName.getPartition(0).toString()));
} catch (Exception e) {
throw new RestException(e);
Expand All @@ -523,7 +523,7 @@ protected List<String> internalGetSubscriptions(boolean authoritative) {
return subscriptions;
}

protected PersistentTopicStats internalGetStats(boolean authoritative) {
protected TopicStats internalGetStats(boolean authoritative) {
validateAdminAndClientPermission();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down Expand Up @@ -575,7 +575,7 @@ protected PartitionedTopicStats internalGetPartitionedStats(boolean authoritativ
PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
try {
for (int i = 0; i < partitionMetadata.partitions; i++) {
PersistentTopicStats partitionStats = pulsar().getAdminClient().persistentTopics()
TopicStats partitionStats = pulsar().getAdminClient().topics()
.getStats(topicName.getPartition(i).toString());
stats.add(partitionStats);
stats.partitions.put(topicName.getPartition(i).toString(), partitionStats);
Expand All @@ -594,7 +594,7 @@ protected void internalDeleteSubscription(String subName, boolean authoritative)
if (partitionMetadata.partitions > 0) {
try {
for (int i = 0; i < partitionMetadata.partitions; i++) {
pulsar().getAdminClient().persistentTopics()
pulsar().getAdminClient().topics()
.deleteSubscription(topicName.getPartition(i).toString(), subName);
}
} catch (Exception e) {
Expand Down Expand Up @@ -637,7 +637,7 @@ protected void internalSkipAllMessages(String subName, boolean authoritative) {
if (partitionMetadata.partitions > 0) {
try {
for (int i = 0; i < partitionMetadata.partitions; i++) {
pulsar().getAdminClient().persistentTopics()
pulsar().getAdminClient().topics()
.skipAllMessages(topicName.getPartition(i).toString(), subName);
}
} catch (Exception e) {
Expand Down Expand Up @@ -707,7 +707,7 @@ protected void internalExpireMessagesForAllSubscriptions(int expireTimeInSeconds
try {
// expire messages for each partition topic
for (int i = 0; i < partitionMetadata.partitions; i++) {
pulsar().getAdminClient().persistentTopics().expireMessagesForAllSubscriptions(
pulsar().getAdminClient().topics().expireMessagesForAllSubscriptions(
topicName.getPartition(i).toString(), expireTimeInSeconds);
}
} catch (Exception e) {
Expand Down Expand Up @@ -740,7 +740,7 @@ protected void internalResetCursor(String subName, long timestamp, boolean autho
Exception partitionException = null;
try {
for (int i = 0; i < numParts; i++) {
pulsar().getAdminClient().persistentTopics().resetCursor(topicName.getPartition(i).toString(),
pulsar().getAdminClient().topics().resetCursor(topicName.getPartition(i).toString(),
subName, timestamp);
}
} catch (PreconditionFailedException pfe) {
Expand Down Expand Up @@ -811,8 +811,7 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl
PulsarAdmin admin = pulsar().getAdminClient();

for (int i = 0; i < partitionMetadata.partitions; i++) {
futures.add(admin.persistentTopics().createSubscriptionAsync(
topicName.getPartition(i).toString(),
futures.add(admin.topics().createSubscriptionAsync(topicName.getPartition(i).toString(),
subscriptionName, messageId));
}

Expand Down Expand Up @@ -1039,7 +1038,7 @@ protected void internalExpireMessages(String subName, int expireTimeInSeconds, b
// expire messages for each partition topic
try {
for (int i = 0; i < partitionMetadata.partitions; i++) {
pulsar().getAdminClient().persistentTopics()
pulsar().getAdminClient().topics()
.expireMessages(topicName.getPartition(i).toString(), subName, expireTimeInSeconds);
}
} catch (Exception e) {
Expand Down Expand Up @@ -1237,13 +1236,13 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
return;
}

admin.persistentTopics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> {
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> {
stats.subscriptions.keySet().forEach(subscription -> {
List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();

subscriptionFutures.add(admin.persistentTopics().createSubscriptionAsync(topicNamePartition,
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import javax.ws.rs.core.Response.Status;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -236,9 +237,11 @@ public List<String> getListFromBundle(@PathParam("property") String property, @P
try {
final List<String> topicList = Lists.newArrayList();
pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
TopicName topicName = TopicName.get(name);
if (nsBundle.includes(topicName)) {
topicList.add(name);
if (BrokerService.extractTopic(topicFuture).isPresent()) {
TopicName topicName = TopicName.get(name);
if (nsBundle.includes(topicName)) {
topicList.add(name);
}
}
});
return topicList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.TopicStats;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
Expand Down Expand Up @@ -228,7 +228,7 @@ public List<String> getSubscriptions(@PathParam("property") String property, @Pa
@ApiOperation(hidden = true, value = "Get the stats for the topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public PersistentTopicStats getStats(@PathParam("property") String property, @PathParam("cluster") String cluster,
public TopicStats getStats(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import javax.ws.rs.core.Response.Status;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -170,8 +171,7 @@ public List<String> getList(@PathParam("tenant") String tenant, @PathParam("name
for (int i = 0; i < boundaries.size() - 1; i++) {
final String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
try {
futures.add(pulsar().getAdminClient().nonPersistentTopics().getListInBundleAsync(namespaceName.toString(),
bundle));
futures.add(pulsar().getAdminClient().topics().getListInBundleAsync(namespaceName.toString(), bundle));
} catch (PulsarServerException e) {
log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s", clientAppId(),
namespaceName, bundle), e);
Expand Down Expand Up @@ -228,9 +228,11 @@ public List<String> getListFromBundle(@PathParam("tenant") String tenant, @PathP
try {
final List<String> topicList = Lists.newArrayList();
pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
TopicName topicName = TopicName.get(name);
if (nsBundle.includes(topicName)) {
topicList.add(name);
if (BrokerService.extractTopic(topicFuture).isPresent()) {
TopicName topicName = TopicName.get(name);
if (nsBundle.includes(topicName)) {
topicList.add(name);
}
}
});
return topicList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.TopicStats;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
Expand Down Expand Up @@ -223,7 +223,7 @@ public List<String> getSubscriptions(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get the stats for the topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public PersistentTopicStats getStats(@PathParam("tenant") String tenant,
public TopicStats getStats(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.stats.Metrics;
Expand Down Expand Up @@ -1033,8 +1033,8 @@ public String generateUniqueProducerName() {
return producerNameGenerator.getNextId();
}

public Map<String, PersistentTopicStats> getTopicStats() {
HashMap<String, PersistentTopicStats> stats = new HashMap<>();
public Map<String, TopicStats> getTopicStats() {
HashMap<String, TopicStats> stats = new HashMap<>();
topics.forEach((name, topicFuture) -> {
Optional<Topic> topic = extractTopic(topicFuture);
if (topic.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaVersion;
Expand Down Expand Up @@ -123,7 +123,7 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats

ConcurrentOpenHashMap<String, ? extends Replicator> getReplicators();

PersistentTopicStats getStats();
TopicStats getStats();

PersistentTopicInternalStats getInternalStats();

Expand Down
Loading

0 comments on commit 02eff26

Please sign in to comment.