diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index f1040840a09ff..d6dd9d9105061 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -307,6 +307,17 @@ public void testCreateSubscriptions() throws Exception{ System.out.println("Message back log for " + SUB_NONE_MESSAGE_ID + " is :" + msgBacklog); Assert.assertEquals(msgBacklog, 0); + // 5) Create replicated subscription + response = mock(AsyncResponse.class); + String replicateSubName = "sub-none-message-id-replicated-sub"; + persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, replicateSubName, true, + null, true); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + TopicStats stats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true, false, false); + Assert.assertNotNull(stats.getSubscriptions().get(replicateSubName)); + Assert.assertTrue(stats.getSubscriptions().get(replicateSubName).isReplicated()); producer.close(); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 27c13ef137e96..1110591eeb2b6 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1595,7 +1595,49 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * @throws PulsarAdminException * Unexpected error */ - void createSubscription(String topic, String subscriptionName, MessageId messageId) + default void createSubscription(String topic, String subscriptionName, MessageId messageId) + throws PulsarAdminException { + createSubscription(topic, subscriptionName, messageId, false); + }; + + /** + * Create a new subscription on a topic. + * + * @param topic + * topic name + * @param subscriptionName + * Subscription name + * @param messageId + * The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest}, + * {@link MessageId#earliest} or a specific message id. + */ + default CompletableFuture createSubscriptionAsync(String topic, String subscriptionName, + MessageId messageId) { + return createSubscriptionAsync(topic, subscriptionName, messageId, false); + } + + /** + * Create a new subscription on a topic. + * + * @param topic + * topic name + * @param subscriptionName + * Subscription name + * @param messageId + * The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest}, + * {@link MessageId#earliest} or a specific message id. + * @param replicated + * replicated subscriptions. + * @throws NotAuthorizedException + * Don't have admin permission + * @throws ConflictException + * Subscription already exists + * @throws NotAllowedException + * Command disallowed for requested resource + * @throws PulsarAdminException + * Unexpected error + */ + void createSubscription(String topic, String subscriptionName, MessageId messageId, boolean replicated) throws PulsarAdminException; /** @@ -1608,8 +1650,12 @@ void createSubscription(String topic, String subscriptionName, MessageId message * @param messageId * The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest}, * {@link MessageId#earliest} or a specific message id. + * + * @param replicated + * replicated subscriptions. */ - CompletableFuture createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId); + CompletableFuture createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId, + boolean replicated); /** * Reset cursor position on a topic subscription. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 66be8e7266f7d..1f2232068f3ec 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1097,24 +1097,18 @@ public MessageId getMessageIdByTimestamp(String topic, long timestamp) @Override - public void createSubscription(String topic, String subscriptionName, MessageId messageId) + public void createSubscription(String topic, String subscriptionName, MessageId messageId, boolean replicated) throws PulsarAdminException { - try { - TopicName tn = validateTopic(topic); - String encodedSubName = Codec.encode(subscriptionName); - WebTarget path = topicPath(tn, "subscription", encodedSubName); - request(path).put(Entity.entity(messageId, MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); - } + sync(() -> createSubscriptionAsync(topic, subscriptionName, messageId, replicated)); } @Override public CompletableFuture createSubscriptionAsync(String topic, String subscriptionName, - MessageId messageId) { + MessageId messageId, boolean replicated) { TopicName tn = validateTopic(topic); String encodedSubName = Codec.encode(subscriptionName); WebTarget path = topicPath(tn, "subscription", encodedSubName); + path = path.queryParam("replicated", replicated); return asyncPutRequest(path, Entity.entity(messageId, MediaType.APPLICATION_JSON)); } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index b10cae36f2167..b8a8612fa27cc 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1409,7 +1409,11 @@ public void topics() throws Exception { verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100); cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest")); - verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest); + verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false); + + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r")); + verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true); cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32")); verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 8a67c224dfe93..69823d07d91bc 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -925,11 +925,13 @@ private class CreateSubscription extends CliCommand { "--subscription" }, description = "Subscription to reset position on", required = true) private String subscriptionName; - @Parameter(names = { "--messageId", - "-m" }, description = "messageId where to create the subscription. " + @Parameter(names = { "-m" , "--messageId" }, description = "messageId where to create the subscription. " + "It can be either 'latest', 'earliest' or (ledgerId:entryId)", required = false) private String messageIdStr = "latest"; + @Parameter(names = { "-r", "--replicated" }, description = "replicated subscriptions", required = false) + private boolean replicated = false; + @Override void run() throws PulsarAdminException { String topic = validateTopicName(params); @@ -942,7 +944,7 @@ void run() throws PulsarAdminException { messageId = validateMessageIdString(messageIdStr); } - getTopics().createSubscription(topic, subscriptionName, messageId); + getTopics().createSubscription(topic, subscriptionName, messageId, replicated); } }