diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 28c3a68e70dde..f9eecf2872d25 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -131,7 +131,8 @@ static class Arguments extends PerformanceBaseArguments { description = "Enable autoScaledReceiverQueueSize") public boolean autoScaledReceiverQueueSize = false; - @Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated") + @Parameter(names = {"-rs", "--replicated" }, + description = "Whether the subscription status should be replicated") public boolean replicatedSubscription = false; @Parameter(names = { "--acks-delay-millis" }, description = "Acknowledgements grouping delay in millis") diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java index e5672194a2498..a7754516afec2 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java @@ -136,6 +136,10 @@ static class Arguments extends PerformanceBaseArguments { @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type") public SubscriptionType subscriptionType = SubscriptionType.Shared; + @Parameter(names = {"-rs", "--replicated" }, + description = "Whether the subscription status should be replicated") + private boolean replicatedSubscription = false; + @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue") public int receiverQueueSize = 1000; @@ -625,10 +629,11 @@ private static void printAggregatedStats() { private static List>> buildConsumer(PulsarClient client, Arguments arguments) throws ExecutionException, InterruptedException { - ConsumerBuilder consumerBuilder = client.newConsumer(Schema.BYTES) // + ConsumerBuilder consumerBuilder = client.newConsumer(Schema.BYTES) .subscriptionType(arguments.subscriptionType) .receiverQueueSize(arguments.receiverQueueSize) - .subscriptionInitialPosition(arguments.subscriptionInitialPosition); + .subscriptionInitialPosition(arguments.subscriptionInitialPosition) + .replicateSubscriptionState(arguments.replicatedSubscription); Iterator consumerTopicsIterator = arguments.consumerTopic.iterator(); List>> consumers = new ArrayList<>(arguments.consumerTopic.size()); diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java index 2ccdf62ff0ebd..12f457587f685 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java @@ -93,7 +93,7 @@ protected void cleanup() throws Exception { @Test public void testTxnPerf() throws Exception { - String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u %s -ss %s -np 1 -au %s"; + String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u %s -ss %s -rs -np 1 -au %s"; String testConsumeTopic = testTopic + UUID.randomUUID(); String testProduceTopic = testTopic + UUID.randomUUID(); String testSub = "testSub"; @@ -108,12 +108,14 @@ public void testTxnPerf() throws Exception { .connectionsPerBroker(100) .statsInterval(0, TimeUnit.SECONDS) .build(); + @Cleanup Producer produceToConsumeTopic = pulsarClient.newProducer(Schema.BYTES) .producerName("perf-transaction-producer") .sendTimeout(0, TimeUnit.SECONDS) .topic(testConsumeTopic) .create(); - pulsarClient.newConsumer(Schema.BYTES) + @Cleanup + final Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) .consumerName("perf-transaction-consumeVerify") .topic(testConsumeTopic) .subscriptionType(SubscriptionType.Shared) @@ -138,6 +140,9 @@ public void testTxnPerf() throws Exception { }); thread.start(); thread.join(); + Assert.assertTrue(admin.topics().getPartitionedStats(testConsumeTopic, false) + .getSubscriptions().get(testSub).isReplicated()); + @Cleanup Consumer consumeFromConsumeTopic = pulsarClient.newConsumer(Schema.BYTES) .consumerName("perf-transaction-consumeVerify") .topic(testConsumeTopic) @@ -145,6 +150,7 @@ public void testTxnPerf() throws Exception { .subscriptionName(testSub) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); + @Cleanup Consumer consumeFromProduceTopic = pulsarClient.newConsumer(Schema.BYTES) .consumerName("perf-transaction-produceVerify") .topic(testProduceTopic) @@ -160,6 +166,7 @@ public void testTxnPerf() throws Exception { Assert.assertNull(message); message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS); Assert.assertNull(message); + } @@ -169,7 +176,8 @@ public void testProduceTxnMessage() throws InterruptedException, PulsarClientExc String topic = testTopic + UUID.randomUUID(); int totalMessage = 100; String args = String.format(argString, topic, pulsar.getBrokerServiceUrl(), totalMessage); - pulsarClient.newConsumer().subscriptionName("subName" + "pre").topic(topic) + @Cleanup + final Consumer subscribe = pulsarClient.newConsumer().subscriptionName("subName" + "pre").topic(topic) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive) .enableBatchIndexAcknowledgment(false) @@ -190,6 +198,7 @@ public void testProduceTxnMessage() throws InterruptedException, PulsarClientExc }); }); + @Cleanup Consumer consumer = pulsarClient.newConsumer().subscriptionName("subName").topic(topic) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive) @@ -211,9 +220,11 @@ public void testConsumeTxnMessage() throws Exception { String topic = testTopic + UUID.randomUUID(); String args = String.format(argString, topic, pulsar.getBrokerServiceUrl(), subName, SubscriptionType.Exclusive, SubscriptionInitialPosition.Earliest, 10); + @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS) .create(); - pulsarClient.newConsumer(Schema.BYTES) + @Cleanup + final Consumer subscribe = pulsarClient.newConsumer(Schema.BYTES) .consumerName("perf-transaction-consumeVerify") .topic(topic) .subscriptionType(SubscriptionType.Shared) @@ -240,6 +251,7 @@ public void testConsumeTxnMessage() throws Exception { }); }); + @Cleanup Consumer consumer = pulsarClient.newConsumer().subscriptionName(subName).topic(topic) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive) @@ -252,6 +264,7 @@ public void testConsumeTxnMessage() throws Exception { } Message message = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNull(message); + } }