Skip to content

Commit f4c6084

Browse files
Merge pull request rabbitmq#48 from rabbitmq/rabbitmq-java-client-41
Introduce Channel#messageCount
2 parents 648d554 + 0405c61 commit f4c6084

File tree

4 files changed

+47
-6
lines changed

4 files changed

+47
-6
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,4 +899,14 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
899899
* @throws IOException Problem transmitting method.
900900
*/
901901
Command rpc(Method method) throws IOException;
902+
903+
/**
904+
* Returns the number of messages in a queue ready to be delivered
905+
* to consumers. This method assumes the queue exists. If it doesn't,
906+
* an exception will be closed with an exception.
907+
* @param queue the name of the queue
908+
* @return the number of messages in ready state
909+
* @throws IOException Problem transmitting method.
910+
*/
911+
long messageCount(String queue) throws IOException;
902912
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -649,12 +649,12 @@ public void basicPublish(String exchange, String routingKey,
649649
useProps = MessageProperties.MINIMAL_BASIC;
650650
}
651651
transmit(new AMQCommand(new Basic.Publish.Builder()
652-
.exchange(exchange)
653-
.routingKey(routingKey)
654-
.mandatory(mandatory)
655-
.immediate(immediate)
656-
.build(),
657-
useProps, body));
652+
.exchange(exchange)
653+
.routingKey(routingKey)
654+
.mandatory(mandatory)
655+
.immediate(immediate)
656+
.build(),
657+
useProps, body));
658658
}
659659

660660
/** Public API - {@inheritDoc} */
@@ -883,6 +883,12 @@ public Queue.DeclareOk queueDeclarePassive(String queue)
883883
.getMethod();
884884
}
885885

886+
/** Public API - {@inheritDoc} */
887+
public long messageCount(String queue) throws IOException {
888+
Queue.DeclareOk ok = queueDeclarePassive(queue);
889+
return ok.getMessageCount();
890+
}
891+
886892
/** Public API - {@inheritDoc} */
887893
public Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty)
888894
throws IOException

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,10 @@ public AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException
287287
return delegate.queueDeclarePassive(queue);
288288
}
289289

290+
public long messageCount(String queue) throws IOException {
291+
return delegate.messageCount(queue);
292+
}
293+
290294
public AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException {
291295
return queueDelete(queue, false, false);
292296
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.test.BrokerTestCase;
4+
5+
import java.io.IOException;
6+
7+
public class MessageCount extends BrokerTestCase {
8+
public void testMessageCount() throws IOException {
9+
String q = generateQueueName();
10+
channel.queueDeclareNoWait(q, false, true, true, null);
11+
assertEquals(0, channel.messageCount(q));
12+
13+
basicPublishVolatile(q);
14+
assertEquals(1, channel.messageCount(q));
15+
basicPublishVolatile(q);
16+
assertEquals(2, channel.messageCount(q));
17+
18+
channel.queuePurge(q);
19+
assertEquals(0, channel.messageCount(q));
20+
}
21+
}

0 commit comments

Comments
 (0)