Skip to content

Commit eccba6e

Browse files
Introduce Channel#consumerCount
1 parent f4c6084 commit eccba6e

File tree

5 files changed

+41
-1
lines changed

5 files changed

+41
-1
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -909,4 +909,14 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
909909
* @throws IOException Problem transmitting method.
910910
*/
911911
long messageCount(String queue) throws IOException;
912+
913+
/**
914+
* Returns the number of consumers on a queue.
915+
* This method assumes the queue exists. If it doesn't,
916+
* an exception will be closed with an exception.
917+
* @param queue the name of the queue
918+
* @return the number of consumers
919+
* @throws IOException Problem transmitting method.
920+
*/
921+
long consumerCount(String queue) throws IOException;
912922
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,12 @@ public long messageCount(String queue) throws IOException {
889889
return ok.getMessageCount();
890890
}
891891

892+
/** Public API - {@inheritDoc} */
893+
public long consumerCount(String queue) throws IOException {
894+
Queue.DeclareOk ok = queueDeclarePassive(queue);
895+
return ok.getConsumerCount();
896+
}
897+
892898
/** Public API - {@inheritDoc} */
893899
public Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty)
894900
throws IOException

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,10 @@ public long messageCount(String queue) throws IOException {
291291
return delegate.messageCount(queue);
292292
}
293293

294+
public long consumerCount(String queue) throws IOException {
295+
return delegate.consumerCount(queue);
296+
}
297+
294298
public AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException {
295299
return queueDelete(queue, false, false);
296300
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.DefaultConsumer;
4+
import com.rabbitmq.client.test.BrokerTestCase;
5+
6+
import java.io.IOException;
7+
8+
public class ConsumerCount extends BrokerTestCase {
9+
public void testConsumerCount() throws IOException {
10+
String q = generateQueueName();
11+
channel.queueDeclare(q, false, true, false, null);
12+
assertEquals(0, channel.consumerCount(q));
13+
14+
String tag = channel.basicConsume(q, new DefaultConsumer(channel));
15+
assertEquals(1, channel.consumerCount(q));
16+
17+
channel.basicCancel(tag);
18+
assertEquals(0, channel.consumerCount(q));
19+
}
20+
}

test/src/com/rabbitmq/client/test/functional/MessageCount.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
public class MessageCount extends BrokerTestCase {
88
public void testMessageCount() throws IOException {
99
String q = generateQueueName();
10-
channel.queueDeclareNoWait(q, false, true, true, null);
10+
channel.queueDeclare(q, false, true, false, null);
1111
assertEquals(0, channel.messageCount(q));
1212

1313
basicPublishVolatile(q);

0 commit comments

Comments
 (0)