Skip to content

Commit

Permalink
[improve][cli] Option to create replicated subscriptions in pulsar-pe…
Browse files Browse the repository at this point in the history
…rf txn (#18904)
  • Loading branch information
nicoloboschi authored Dec 14, 2022
1 parent 52c382a commit 9e8bf7a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ static class Arguments extends PerformanceBaseArguments {
description = "Enable autoScaledReceiverQueueSize")
public boolean autoScaledReceiverQueueSize = false;

@Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated")
@Parameter(names = {"-rs", "--replicated" },
description = "Whether the subscription status should be replicated")
public boolean replicatedSubscription = false;

@Parameter(names = { "--acks-delay-millis" }, description = "Acknowledgements grouping delay in millis")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ static class Arguments extends PerformanceBaseArguments {
@Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
public SubscriptionType subscriptionType = SubscriptionType.Shared;

@Parameter(names = {"-rs", "--replicated" },
description = "Whether the subscription status should be replicated")
private boolean replicatedSubscription = false;

@Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
public int receiverQueueSize = 1000;

Expand Down Expand Up @@ -625,10 +629,11 @@ private static void printAggregatedStats() {

private static List<List<Consumer<byte[]>>> buildConsumer(PulsarClient client, Arguments arguments)
throws ExecutionException, InterruptedException {
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer(Schema.BYTES) //
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer(Schema.BYTES)
.subscriptionType(arguments.subscriptionType)
.receiverQueueSize(arguments.receiverQueueSize)
.subscriptionInitialPosition(arguments.subscriptionInitialPosition);
.subscriptionInitialPosition(arguments.subscriptionInitialPosition)
.replicateSubscriptionState(arguments.replicatedSubscription);

Iterator<String> consumerTopicsIterator = arguments.consumerTopic.iterator();
List<List<Consumer<byte[]>>> consumers = new ArrayList<>(arguments.consumerTopic.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected void cleanup() throws Exception {

@Test
public void testTxnPerf() throws Exception {
String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u %s -ss %s -np 1 -au %s";
String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u %s -ss %s -rs -np 1 -au %s";
String testConsumeTopic = testTopic + UUID.randomUUID();
String testProduceTopic = testTopic + UUID.randomUUID();
String testSub = "testSub";
Expand All @@ -108,12 +108,14 @@ public void testTxnPerf() throws Exception {
.connectionsPerBroker(100)
.statsInterval(0, TimeUnit.SECONDS)
.build();
@Cleanup
Producer<byte[]> produceToConsumeTopic = pulsarClient.newProducer(Schema.BYTES)
.producerName("perf-transaction-producer")
.sendTimeout(0, TimeUnit.SECONDS)
.topic(testConsumeTopic)
.create();
pulsarClient.newConsumer(Schema.BYTES)
@Cleanup
final Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.consumerName("perf-transaction-consumeVerify")
.topic(testConsumeTopic)
.subscriptionType(SubscriptionType.Shared)
Expand All @@ -138,13 +140,17 @@ public void testTxnPerf() throws Exception {
});
thread.start();
thread.join();
Assert.assertTrue(admin.topics().getPartitionedStats(testConsumeTopic, false)
.getSubscriptions().get(testSub).isReplicated());
@Cleanup
Consumer<byte[]> consumeFromConsumeTopic = pulsarClient.newConsumer(Schema.BYTES)
.consumerName("perf-transaction-consumeVerify")
.topic(testConsumeTopic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(testSub)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@Cleanup
Consumer<byte[]> consumeFromProduceTopic = pulsarClient.newConsumer(Schema.BYTES)
.consumerName("perf-transaction-produceVerify")
.topic(testProduceTopic)
Expand All @@ -160,6 +166,7 @@ public void testTxnPerf() throws Exception {
Assert.assertNull(message);
message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);

}


Expand All @@ -169,7 +176,8 @@ public void testProduceTxnMessage() throws InterruptedException, PulsarClientExc
String topic = testTopic + UUID.randomUUID();
int totalMessage = 100;
String args = String.format(argString, topic, pulsar.getBrokerServiceUrl(), totalMessage);
pulsarClient.newConsumer().subscriptionName("subName" + "pre").topic(topic)
@Cleanup
final Consumer<byte[]> subscribe = pulsarClient.newConsumer().subscriptionName("subName" + "pre").topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)
.enableBatchIndexAcknowledgment(false)
Expand All @@ -190,6 +198,7 @@ public void testProduceTxnMessage() throws InterruptedException, PulsarClientExc
});
});

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName("subName").topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)
Expand All @@ -211,9 +220,11 @@ public void testConsumeTxnMessage() throws Exception {
String topic = testTopic + UUID.randomUUID();
String args = String.format(argString, topic, pulsar.getBrokerServiceUrl(), subName,
SubscriptionType.Exclusive, SubscriptionInitialPosition.Earliest, 10);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS)
.create();
pulsarClient.newConsumer(Schema.BYTES)
@Cleanup
final Consumer<byte[]> subscribe = pulsarClient.newConsumer(Schema.BYTES)
.consumerName("perf-transaction-consumeVerify")
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
Expand All @@ -240,6 +251,7 @@ public void testConsumeTxnMessage() throws Exception {
});
});

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subName).topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)
Expand All @@ -252,6 +264,7 @@ public void testConsumeTxnMessage() throws Exception {
}
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);

}

}

0 comments on commit 9e8bf7a

Please sign in to comment.