Skip to content

Commit

Permalink
Support creating replicate subscription in CLI. (#14692)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Mar 16, 2022
1 parent 6ce0368 commit b741aa3
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ public void testCreateSubscriptions() throws Exception{
System.out.println("Message back log for " + SUB_NONE_MESSAGE_ID + " is :" + msgBacklog);
Assert.assertEquals(msgBacklog, 0);

// 5) Create replicated subscription
response = mock(AsyncResponse.class);
String replicateSubName = "sub-none-message-id-replicated-sub";
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, replicateSubName, true,
null, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
TopicStats stats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true, false, false);
Assert.assertNotNull(stats.getSubscriptions().get(replicateSubName));
Assert.assertTrue(stats.getSubscriptions().get(replicateSubName).isReplicated());
producer.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,49 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* @throws PulsarAdminException
* Unexpected error
*/
void createSubscription(String topic, String subscriptionName, MessageId messageId)
default void createSubscription(String topic, String subscriptionName, MessageId messageId)
throws PulsarAdminException {
createSubscription(topic, subscriptionName, messageId, false);
};

/**
* Create a new subscription on a topic.
*
* @param topic
* topic name
* @param subscriptionName
* Subscription name
* @param messageId
* The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
* {@link MessageId#earliest} or a specific message id.
*/
default CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName,
MessageId messageId) {
return createSubscriptionAsync(topic, subscriptionName, messageId, false);
}

/**
* Create a new subscription on a topic.
*
* @param topic
* topic name
* @param subscriptionName
* Subscription name
* @param messageId
* The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
* {@link MessageId#earliest} or a specific message id.
* @param replicated
* replicated subscriptions.
* @throws NotAuthorizedException
* Don't have admin permission
* @throws ConflictException
* Subscription already exists
* @throws NotAllowedException
* Command disallowed for requested resource
* @throws PulsarAdminException
* Unexpected error
*/
void createSubscription(String topic, String subscriptionName, MessageId messageId, boolean replicated)
throws PulsarAdminException;

/**
Expand All @@ -1608,8 +1650,12 @@ void createSubscription(String topic, String subscriptionName, MessageId message
* @param messageId
* The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
* {@link MessageId#earliest} or a specific message id.
*
* @param replicated
* replicated subscriptions.
*/
CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId);
CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId,
boolean replicated);

/**
* Reset cursor position on a topic subscription.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,24 +1097,18 @@ public MessageId getMessageIdByTimestamp(String topic, long timestamp)


@Override
public void createSubscription(String topic, String subscriptionName, MessageId messageId)
public void createSubscription(String topic, String subscriptionName, MessageId messageId, boolean replicated)
throws PulsarAdminException {
try {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subscriptionName);
WebTarget path = topicPath(tn, "subscription", encodedSubName);
request(path).put(Entity.entity(messageId, MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
sync(() -> createSubscriptionAsync(topic, subscriptionName, messageId, replicated));
}

@Override
public CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName,
MessageId messageId) {
MessageId messageId, boolean replicated) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subscriptionName);
WebTarget path = topicPath(tn, "subscription", encodedSubName);
path = path.queryParam("replicated", replicated);
return asyncPutRequest(path, Entity.entity(messageId, MediaType.APPLICATION_JSON));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,11 @@ public void topics() throws Exception {
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);

cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest);
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false);

cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r"));
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true);

cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,11 +925,13 @@ private class CreateSubscription extends CliCommand {
"--subscription" }, description = "Subscription to reset position on", required = true)
private String subscriptionName;

@Parameter(names = { "--messageId",
"-m" }, description = "messageId where to create the subscription. "
@Parameter(names = { "-m" , "--messageId" }, description = "messageId where to create the subscription. "
+ "It can be either 'latest', 'earliest' or (ledgerId:entryId)", required = false)
private String messageIdStr = "latest";

@Parameter(names = { "-r", "--replicated" }, description = "replicated subscriptions", required = false)
private boolean replicated = false;

@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
Expand All @@ -942,7 +944,7 @@ void run() throws PulsarAdminException {
messageId = validateMessageIdString(messageIdStr);
}

getTopics().createSubscription(topic, subscriptionName, messageId);
getTopics().createSubscription(topic, subscriptionName, messageId, replicated);
}
}

Expand Down

0 comments on commit b741aa3

Please sign in to comment.