Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[testclient] Add new option --num-messages for consumer and reader #12016

Merged
merged 1 commit into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void runPerformanceTest(long messages, long limit, int numOfTopic, int si
for (String topic : producersMap.keySet()) {
if (messages > 0) {
if (totalSent >= messages) {
log.trace("------------------- DONE -----------------------");
log.trace("------------- DONE (reached the maximum number: [{}] of production) --------------", messages);
Thread.sleep(10000);
PerfClientUtils.exit(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ public static void main(String[] args) throws Exception {
PerfClientUtils.exit(-1);
}

arguments.testTime = TimeUnit.SECONDS.toMillis(arguments.testTime);

// Dump config variables
PerfClientUtils.printJVMInformation(log);
ObjectMapper m = new ObjectMapper();
Expand Down Expand Up @@ -240,7 +238,8 @@ public void run() {

// Acquire 1 sec worth of messages to have a slower ramp-up
rateLimiter.acquire((int) msgRate);
final long startTime = System.currentTimeMillis();
final long startTime = System.nanoTime();
final long testEndTime = startTime + (long) (arguments.testTime * 1e9);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using TimeUnit.SECONDS.toNanos() instead of 1e9 to make it more understandable to other developers


final Semaphore semaphore = new Semaphore(maxOutstandingForThisThread);

Expand Down Expand Up @@ -270,8 +269,8 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
while (true) {
for (int j = 0; j < nunManagedLedgersForThisThread; j++) {
if (arguments.testTime > 0) {
if (System.currentTimeMillis() - startTime > arguments.testTime) {
log.info("------------------- DONE -----------------------");
if (System.nanoTime() > testEndTime) {
log.info("------------- DONE (reached the maximum duration: [{} seconds] of production) --------------", arguments.testTime);
printAggregatedStats();
isDone.set(true);
Thread.sleep(5000);
Expand All @@ -281,7 +280,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {

if (numMessagesForThisThread > 0) {
if (totalSent++ >= numMessagesForThisThread) {
log.info("------------------- DONE -----------------------");
log.info("------------- DONE (reached the maximum number: [{}] of production) --------------", numMessagesForThisThread);
printAggregatedStats();
isDone.set(true);
Thread.sleep(5000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ static class Arguments {
@Parameter(names = { "--acks-delay-millis" }, description = "Acknowledgements grouping delay in millis")
public int acknowledgmentsGroupingDelayMillis = 100;

@Parameter(names = {"-m",
"--num-messages"}, description = "Number of messages to consume in total. If <= 0, it will keep consuming")
public long numMessages = 0;

@Parameter(names = { "-c",
"--max-connections" }, description = "Max number of TCP connections to a single broker")
public int maxConnections = 100;
Expand Down Expand Up @@ -288,11 +292,16 @@ public static void main(String[] args) throws Exception {
MessageListener<ByteBuffer> listener = (consumer, msg) -> {
if (arguments.testTime > 0) {
if (System.nanoTime() > testEndTime) {
log.info("------------------- DONE -----------------------");
log.info("------------- DONE (reached the maximum duration: [{} seconds] of consumption) --------------", arguments.testTime);
printAggregatedStats();
PerfClientUtils.exit(0);
}
}
if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= arguments.numMessages) {
log.info("------------- DONE (reached the maximum number: [{}] of consumption) --------------", arguments.numMessages);
printAggregatedStats();
PerfClientUtils.exit(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a dummy 'return' statement.

}
messagesReceived.increment();
bytesReceived.add(msg.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ private static void runProducer(int producerId,
for (Producer<byte[]> producer : producers) {
if (arguments.testTime > 0) {
if (System.nanoTime() > testEndTime) {
log.info("------------------- DONE -----------------------");
log.info("------------- DONE (reached the maximum duration: [{} seconds] of production) --------------", arguments.testTime);
printAggregatedStats();
doneLatch.countDown();
Thread.sleep(5000);
Expand All @@ -611,7 +611,7 @@ private static void runProducer(int producerId,

if (numMessages > 0) {
if (totalSent++ >= numMessages) {
log.info("------------------- DONE -----------------------");
log.info("------------- DONE (reached the maximum number: {} of production) --------------", numMessages);
printAggregatedStats();
doneLatch.countDown();
Thread.sleep(5000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ static class Arguments {
@Parameter(names = { "-q", "--receiver-queue-size" }, description = "Size of the receiver queue")
public int receiverQueueSize = 1000;

@Parameter(names = {"-n",
"--num-messages"}, description = "Number of messages to consume in total. If <= 0, it will keep consuming")
public long numMessages = 0;

@Parameter(names = { "-c",
"--max-connections" }, description = "Max number of TCP connections to a single broker")
public int maxConnections = 100;
Expand Down Expand Up @@ -215,10 +219,15 @@ public static void main(String[] args) throws Exception {
ReaderListener<byte[]> listener = (reader, msg) -> {
if (arguments.testTime > 0) {
if (System.nanoTime() > testEndTime) {
log.info("------------------- DONE -----------------------");
log.info("------------- DONE (reached the maximum duration: [{} seconds] of consumption) --------------", arguments.testTime);
PerfClientUtils.exit(0);
}
}
if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= arguments.numMessages) {
log.info("------------- DONE (reached the maximum number: [{}] of consumption) --------------", arguments.numMessages);
printAggregatedStats();
PerfClientUtils.exit(0);
}
messagesReceived.increment();
bytesReceived.add(msg.getData().length);

Expand Down
2 changes: 2 additions & 0 deletions site2/docs/reference-cli-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ Options
|`-v`, `--encryption-key-value-file`|The file which contains the private key to decrypt payload||
|`-h`, `--help`|Help message|false|
|`--conf-file`|Configuration file||
|`-m`, `--num-messages`|Number of messages to consume in total. If the value is equal to or smaller than 0, it keeps consuming messages.|0|
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-n`, `--num-consumers`|Number of consumers (per topic)|1|
|`-t`, `--num-topics`|The number of topics|1|
Expand Down Expand Up @@ -508,6 +509,7 @@ Options
|`--listener-name`|Listener name for the broker||
|`--conf-file`|Configuration file||
|`-h`, `--help`|Help message|false|
|`-n`, `--num-messages`|Number of messages to consume in total. If the value is equal to or smaller than 0, it keeps consuming messages.|0|
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-t`, `--num-topics`|The number of topics|1|
|`-r`, `--rate`|Simulate a slow message reader (rate in msg/s)|0|
Expand Down