Skip to content

Commit 45c0ecd

Browse files
author
YunaiV
committed
初始化 rabbitmq 示例
1 parent 3a90d74 commit 45c0ecd

File tree

2 files changed

+41
-38
lines changed

2 files changed

+41
-38
lines changed

lab-04/lab-04-rabbitmq-native/src/main/java/cn/iocoder/springboot/lab04/rabbitmqdemo/RabbitMQConsumer.java

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,41 +8,28 @@
88

99
public class RabbitMQConsumer {
1010

11-
private static final String EXCHANGE_NAME = "exchange_demo";
12-
private static final String ROUTING_KEY = "routingkey_demo";
13-
private static final String QUEUE_NAME = "queue_demo";
14-
private static final String IP_ADDRESS = "127.0.0.1";
15-
public static final Integer PORT = 5672;
16-
1711
public static void main(String[] args) throws IOException, TimeoutException {
1812
// 创建连接
19-
ConnectionFactory factory = new ConnectionFactory();
20-
factory.setHost(IP_ADDRESS);
21-
factory.setPort(PORT);
22-
factory.setUsername("guest");
23-
factory.setPassword("guest");
24-
Connection connection = factory.newConnection();
13+
Connection connection = RabbitMQProducer.getConnection();
14+
2515
// 创建信道
2616
final Channel channel = connection.createChannel();
2717
channel.basicQos(64); // 设置客户端最多接收未被 ack 的消息数量为 64 。
28-
// 创建交换器:direct、持久化、不自动删除
29-
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
3018

3119
// 创建消费者
3220
Consumer consumer = new DefaultConsumer(channel) {
3321

3422
@Override
3523
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
24+
// 打印日志
3625
System.out.println("线程:" + Thread.currentThread() + ":消息:" + new String(body));
37-
try {
38-
TimeUnit.SECONDS.sleep(1);
39-
} catch (InterruptedException ignore) {
40-
}
26+
// ack 消息已经消费
4127
channel.basicAck(envelope.getDeliveryTag(), false);
4228
}
4329

4430
};
45-
channel.basicConsume(QUEUE_NAME, consumer);
31+
// 订阅消费 QUEUE_NAME 队列
32+
channel.basicConsume(RabbitMQProducer.QUEUE_NAME, consumer);
4633

4734
// 关闭
4835
try {

lab-04/lab-04-rabbitmq-native/src/main/java/cn/iocoder/springboot/lab04/rabbitmqdemo/RabbitMQProducer.java

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,27 @@
1010

1111
public class RabbitMQProducer {
1212

13+
private static final String IP_ADDRESS = "127.0.0.1";
14+
private static final Integer PORT = 5672;
15+
private static final String USERNAME = "guest";
16+
private static final String PASSWORD = "guest";
17+
1318
private static final String EXCHANGE_NAME = "exchange_demo";
1419
private static final String ROUTING_KEY = "routingkey_demo";
15-
private static final String QUEUE_NAME = "queue_demo";
16-
private static final String IP_ADDRESS = "127.0.0.1";
17-
public static final Integer PORT = 5672;
20+
public static final String QUEUE_NAME = "queue_demo"; // 只有 QUEUE_NAME 需要共享给 RabbitMQConsumer
1821

1922
public static void main(String[] args) throws IOException, TimeoutException {
2023
// 创建连接
21-
ConnectionFactory factory = new ConnectionFactory();
22-
factory.setHost(IP_ADDRESS);
23-
factory.setPort(PORT);
24-
factory.setUsername("guest");
25-
factory.setPassword("guest");
26-
Connection connection = factory.newConnection();
24+
Connection connection = getConnection();
25+
2726
// 创建信道
2827
Channel channel = connection.createChannel();
29-
// 创建交换器:direct、持久化、不自动删除
30-
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
31-
// 创建队列:持久化、非排他、非自动删除的队列
32-
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
33-
// channel.queueDeclare(QUEUE_NAME + "_2", true, false, false, null);
34-
// 将交换器与队列通过路由键绑定
35-
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
36-
// channel.queueBind(QUEUE_NAME + "_2", EXCHANGE_NAME, ROUTING_KEY);
3728

38-
// 发送一条消息
39-
for (int i = 0; i < 1000; i++) {
29+
// 初始化测试用的 Exchange 和 Queue
30+
initExchangeAndQueue(channel);
31+
32+
// 发送 3 条消息
33+
for (int i = 0; i < 3; i++) {
4034
String message = "Hello World" + i;
4135
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
4236
}
@@ -46,4 +40,26 @@ public static void main(String[] args) throws IOException, TimeoutException {
4640
connection.close();
4741
}
4842

43+
public static Connection getConnection() throws IOException, TimeoutException {
44+
ConnectionFactory factory = new ConnectionFactory();
45+
factory.setHost(IP_ADDRESS);
46+
factory.setPort(PORT);
47+
factory.setUsername(USERNAME);
48+
factory.setPassword(PASSWORD);
49+
return factory.newConnection();
50+
}
51+
52+
// 创建 RabbitMQ Exchange 和 Queue ,然后使用 ROUTING_KEY 路由键将两者绑定。
53+
// 该步骤,其实可以在 RabbitMQ Management 上操作,并不一定需要在代码中
54+
private static void initExchangeAndQueue(Channel channel) throws IOException {
55+
// 创建交换器:direct、持久化、不自动删除
56+
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
57+
58+
// 创建队列:持久化、非排他、非自动删除的队列
59+
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
60+
61+
// 将交换器与队列通过路由键绑定
62+
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
63+
}
64+
4965
}

0 commit comments

Comments
 (0)