diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 8503459fe9194..54b98b647a0c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2725,6 +2725,18 @@ protected CompletableFuture internalSetMaxSubscriptionsPerTopic(Integer ma return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); } + protected Optional internalGetReplicatorDispatchRate() { + preValidation(); + return getTopicPolicies(topicName).map(TopicPolicies::getReplicatorDispatchRate); + } + + protected CompletableFuture internalSetReplicatorDispatchRate(DispatchRate dispatchRate) { + preValidation(); + TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); + topicPolicies.setReplicatorDispatchRate(dispatchRate); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + } + private void preValidation() { validateAdminAccessForTenant(namespaceName.getTenant()); validatePoliciesReadOnlyAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 888a47863d40d..62173719e70be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1902,6 +1902,90 @@ public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncR }); } + @GET + @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate") + @ApiOperation(value = "Get replicatorDispatchRate config for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + try { + Optional dispatchRate = internalGetReplicatorDispatchRate(); + if (dispatchRate.isPresent()) { + asyncResponse.resume(dispatchRate.get()); + } else { + asyncResponse.resume(Response.noContent().build()); + } + } catch (RestException e) { + asyncResponse.resume(e); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate") + @ApiOperation(value = "Set replicatorDispatchRate config for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "Invalid value of replicatorDispatchRate")}) + public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Replicator dispatch rate of the topic") + DispatchRate dispatchRate) { + validateTopicName(tenant, namespace, encodedTopic); + internalSetReplicatorDispatchRate(dispatchRate).whenComplete((r, ex) -> { + if (ex instanceof RestException) { + log.error("Updating replicatorDispatchRate failed", ex); + asyncResponse.resume(ex); + } else if (ex != null) { + log.error("Updating replicatorDispatchRate failed", ex); + asyncResponse.resume(new RestException(ex)); + } else { + log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}" + + ", replicatorDispatchRate={}" + , clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate") + @ApiOperation(value = "Remove replicatorDispatchRate config for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + internalSetReplicatorDispatchRate(null).whenComplete((r, ex) -> { + if (ex != null) { + log.error("Failed to remove replicatorDispatchRate", ex); + asyncResponse.resume(new RestException(ex)); + } else { + log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}", + clientAppId(), namespaceName, topicName.getLocalName()); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + @GET @Path("/{tenant}/{namespace}/{topic}/maxProducers") @ApiOperation(value = "Get maxProducers config for specified topic.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 7fb4d43e9c408..285d5877de165 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -143,8 +143,9 @@ private DispatchRate createDispatchRate() { } /** - * Update dispatch-throttling-rate. gives first priority to namespace-policy configured dispatch rate else applies - * default broker dispatch-throttling-rate + * Update dispatch-throttling-rate. + * Topic-level has the highest priority, then namespace-level, and finally use dispatch-throttling-rate in + * broker-level */ public void updateDispatchRate() { Optional dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type); @@ -189,6 +190,13 @@ public static Optional getTopicPolicyDispatchRate(BrokerService br .getTopicPolicies(TopicName.get(topicName))) .map(TopicPolicies::getSubscriptionDispatchRate); break; + case REPLICATOR: + dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService() + .getTopicPolicies(TopicName.get(topicName))) + .map(TopicPolicies::getReplicatorDispatchRate); + break; + default: + break; } } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { log.debug("Topic {} policies cache have not init.", topicName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9f8a447839533..0e0cc8eb712f4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -497,11 +497,13 @@ public CompletableFuture> addProducer(Producer producer, }); } + @Override protected CompletableFuture incrementTopicEpoch(Optional currentEpoch) { long newEpoch = currentEpoch.orElse(-1L) + 1; return setTopicEpoch(newEpoch); } + @Override protected CompletableFuture setTopicEpoch(long newEpoch) { CompletableFuture future = new CompletableFuture<>(); ledger.asyncSetProperty(TOPIC_EPOCH_PROPERTY_NAME, String.valueOf(newEpoch), new UpdatePropertiesCallback() { @@ -2059,7 +2061,7 @@ public CompletableFuture onPoliciesUpdate(Policies data) { } }); replicators.forEach((name, replicator) -> - replicator.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data)) + replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate) ); checkMessageExpiry(); CompletableFuture replicationFuture = checkReplicationAndRetryOnFailure(); @@ -2662,10 +2664,12 @@ public void onUpdate(TopicPolicies policies) { } initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies)); - if (this.subscribeRateLimiter.isPresent() && policies != null) { + if (this.subscribeRateLimiter.isPresent()) { subscribeRateLimiter.ifPresent(subscribeRateLimiter -> subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate())); } + replicators.forEach((name, replicator) -> replicator.getRateLimiter() + .ifPresent(DispatchRateLimiter::updateDispatchRate)); } private Optional getNamespacePolicies() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index f6e9eda36a1d9..e8d09d7023f87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -1387,4 +1387,25 @@ public void testMaxSubscriptionsPerTopic() throws Exception { c.close(); } } + + @Test(timeOut = 30000) + public void testReplicatorRateApi() throws Exception { + final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); + // init cache + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); + + assertNull(admin.topics().getReplicatorDispatchRate(topic)); + + DispatchRate dispatchRate = new DispatchRate(100,200L,10); + admin.topics().setReplicatorDispatchRate(topic, dispatchRate); + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() + -> assertEquals(admin.topics().getReplicatorDispatchRate(topic), dispatchRate)); + + admin.topics().removeReplicatorDispatchRate(topic); + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() + -> assertNull(admin.topics().getReplicatorDispatchRate(topic))); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index 4c04ccf1a504a..9e68cc5c36d4c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -31,7 +31,9 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -41,6 +43,10 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + /** * Starts 3 brokers that are in 3 different clusters */ @@ -74,6 +80,73 @@ public Object[][] dispatchRateProvider() { return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } }; } + @Test + public void testReplicatorRatePriority() throws Exception { + shutdown(); + config1.setSystemTopicEnabled(true); + config1.setTopicLevelPoliciesEnabled(true); + config1.setDispatchThrottlingRatePerReplicatorInMsg(100); + config1.setDispatchThrottlingRatePerReplicatorInByte(200L); + setup(); + + final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis(); + final String topicName = "persistent://" + namespace + "/ratechange"; + + admin1.namespaces().createNamespace(namespace); + // set 2 clusters, there will be 1 replicator in each topic + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + client1.newProducer().topic(topicName).create().close(); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); + Awaitility.await().atMost(3, TimeUnit.SECONDS) + .until(() -> pulsar1.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName))); + + //use broker-level by default + assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 200L); + + //set namespace-level policy, which should take effect + DispatchRate nsDispatchRate = new DispatchRate(50, 60L, 70); + admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate)); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 50); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 60L); + + //set topic-level policy, which should take effect + DispatchRate topicRate = new DispatchRate(10, 20L, 30); + admin1.topics().setReplicatorDispatchRate(topicName, topicRate); + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate)); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + + //Set the namespace-level policy, which should not take effect + DispatchRate nsDispatchRate2 = new DispatchRate(500, 600L, 700); + admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate2); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate2)); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + + //remove topic-level policy, namespace-level should take effect + admin1.topics().removeReplicatorDispatchRate(topicName); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> + assertNull(admin1.topics().getReplicatorDispatchRate(topicName))); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 500); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), + 600L); + + //remove namespace-level policy, broker-level should take effect + admin1.namespaces().setReplicatorDispatchRate(namespace, null); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100)); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), + 200L); + } + /** * verifies dispatch rate for replicators get changed once namespace policies changed. * @@ -103,7 +176,6 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception { .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); producer.close(); - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); // 1. default replicator throttling not configured diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 3c68986a93240..3f275d4bb4816 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2161,6 +2161,70 @@ void setInactiveTopicPolicies(String topic */ CompletableFuture removeSubscriptionDispatchRateAsync(String topic); + /** + * Set replicatorDispatchRate for the topic. + *

+ * Replicator dispatch rate under this topic can dispatch this many messages per second + * + * @param topic + * @param dispatchRate + * number of messages per second + * @throws PulsarAdminException + * Unexpected error + */ + void setReplicatorDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException; + + /** + * Set replicatorDispatchRate for the topic asynchronously. + *

+ * Replicator dispatch rate under this topic can dispatch this many messages per second. + * + * @param topic + * @param dispatchRate + * number of messages per second + */ + CompletableFuture setReplicatorDispatchRateAsync(String topic, DispatchRate dispatchRate); + + /** + * Get replicatorDispatchRate for the topic. + *

+ * Replicator dispatch rate under this topic can dispatch this many messages per second. + * + * @param topic + * @returns DispatchRate + * number of messages per second + * @throws PulsarAdminException + * Unexpected error + */ + DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminException; + + /** + * Get replicatorDispatchRate asynchronously. + *

+ * Replicator dispatch rate under this topic can dispatch this many messages per second. + * + * @param topic + * @returns DispatchRate + * number of messages per second + */ + CompletableFuture getReplicatorDispatchRateAsync(String topic); + + /** + * Remove replicatorDispatchRate for a topic. + * @param topic + * Topic name + * @throws PulsarAdminException + * Unexpected error + */ + void removeReplicatorDispatchRate(String topic) throws PulsarAdminException; + + /** + * Remove replicatorDispatchRate for a topic asynchronously. + * @param topic + * Topic name + */ + CompletableFuture removeReplicatorDispatchRateAsync(String topic); + /** * Get the compactionThreshold for a topic. The maximum number of bytes * can have before compaction is triggered. 0 disables. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 617ed32ecd0d1..eda3443fbac41 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -2938,6 +2938,82 @@ public CompletableFuture removeDeduplicationSnapshotIntervalAsync(String t return asyncDeleteRequest(path); } + @Override + public DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminException { + try { + return getReplicatorDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getReplicatorDispatchRateAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "replicatorDispatchRate"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(DispatchRate dispatchRate) { + future.complete(dispatchRate); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void setReplicatorDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException { + try { + setReplicatorDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture setReplicatorDispatchRateAsync(String topic, DispatchRate dispatchRate) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "replicatorDispatchRate"); + return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON)); + } + + @Override + public void removeReplicatorDispatchRate(String topic) throws PulsarAdminException { + try { + removeReplicatorDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeReplicatorDispatchRateAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "replicatorDispatchRate"); + return asyncDeleteRequest(path); + } + @Override public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException { try { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 210d9d73729e5..477f03259265a 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -770,6 +770,16 @@ public void topics() throws Exception { cmdTopics.run(split("disable-deduplication persistent://myprop/clust/ns1/ds1")); verify(mockTopics).enableDeduplication("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("get-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1"); + + cmdTopics.run(split("set-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1 -md 10 -bd 11 -dt 12")); + verify(mockTopics).setReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1", + new DispatchRate(10,11,12)); + + cmdTopics.run(split("remove-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("get-deduplication-enabled persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 8488e171c6303..fff4a59d99408 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -146,6 +146,10 @@ public CmdTopics(PulsarAdmin admin) { jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate()); jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate()); + jcommander.addCommand("get-replicator-dispatch-rate", new GetReplicatorDispatchRate()); + jcommander.addCommand("set-replicator-dispatch-rate", new SetReplicatorDispatchRate()); + jcommander.addCommand("remove-replicator-dispatch-rate", new RemoveReplicatorDispatchRate()); + jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold()); jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold()); jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold()); @@ -1682,6 +1686,59 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get replicator message-dispatch-rate for a topic") + private class GetReplicatorDispatchRate extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String topic = validatePersistentTopic(params); + print(admin.topics().getReplicatorDispatchRate(topic)); + } + } + + @Parameters(commandDescription = "Set replicator message-dispatch-rate for a topic") + private class SetReplicatorDispatchRate extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "--msg-dispatch-rate", + "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false) + private int msgDispatchRate = -1; + + @Parameter(names = { "--byte-dispatch-rate", + "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false) + private long byteDispatchRate = -1; + + @Parameter(names = { "--dispatch-rate-period", + "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false) + private int dispatchRatePeriodSec = 1; + + @Parameter(names = { "--relative-to-publish-rate", + "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false) + private boolean relativeToPublishRate = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().setReplicatorDispatchRate(persistentTopic, + new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate)); + } + } + + @Parameters(commandDescription = "Remove replicator message-dispatch-rate for a topic") + private class RemoveReplicatorDispatchRate extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removeReplicatorDispatchRate(persistentTopic); + } + } + @Parameters(commandDescription = "Get max number of producers for a topic") private class GetMaxProducers extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index a6c7ebf09f7a0..3c0c78e90c5d2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -57,6 +57,11 @@ public class TopicPolicies { private Integer deduplicationSnapshotIntervalSeconds = null; private Integer maxMessageSize = null; private Integer maxSubscriptionsPerTopic = null; + private DispatchRate replicatorDispatchRate = null; + + public boolean isReplicatorDispatchRateSet() { + return replicatorDispatchRate != null; + } public boolean isMaxSubscriptionsPerTopicSet() { return maxSubscriptionsPerTopic != null;