Skip to content

Commit f7bce88

Browse files
author
YunaiV
committed
增加 rabbitmq 示例
1 parent a95edcf commit f7bce88

File tree

6 files changed

+159
-6
lines changed

6 files changed

+159
-6
lines changed

lab-04/lab-04-rabbitmq-demo/src/main/java/cn/iocoder/springboot/lab04/rabbitmqdemo/config/RabbitConfig.java

+37-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package cn.iocoder.springboot.lab04.rabbitmqdemo.config;
22

33
import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo01Message;
4-
import org.springframework.amqp.core.Binding;
5-
import org.springframework.amqp.core.BindingBuilder;
6-
import org.springframework.amqp.core.DirectExchange;
7-
import org.springframework.amqp.core.Queue;
4+
import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo02Message;
5+
import org.springframework.amqp.core.*;
86
import org.springframework.context.annotation.Bean;
97
import org.springframework.context.annotation.Configuration;
108

@@ -25,7 +23,7 @@ public Queue demo01Queue() {
2523
false); // autoDelete: 是否自动删除
2624
}
2725

28-
// 创建 Exchange
26+
// 创建 Direct Exchange
2927
@Bean
3028
public DirectExchange demo01Exchange() {
3129
return new DirectExchange(Demo01Message.EXCHANGE,
@@ -44,4 +42,38 @@ public Binding demo01Binding() {
4442

4543
}
4644

45+
/**
46+
* Topic Exchange 示例的配置类
47+
*/
48+
public static class TopicExchangeDemoConfiguration {
49+
50+
// 创建 Queue
51+
@Bean
52+
public Queue demo02Queue() {
53+
return new Queue(Demo02Message.QUEUE, // Queue 名字
54+
true, // durable: 是否持久化
55+
false, // exclusive: 是否排它
56+
false); // autoDelete: 是否自动删除
57+
}
58+
59+
// 创建 Topic Exchange
60+
@Bean
61+
public TopicExchange demo02Exchange() {
62+
return new TopicExchange(Demo02Message.EXCHANGE,
63+
true, // durable: 是否持久化
64+
false); // exclusive: 是否排它
65+
}
66+
67+
// 创建 Binding
68+
// Exchange:Demo02Message.EXCHANGE
69+
// Routing key:Demo02Message.ROUTING_KEY
70+
// Queue:Demo02Message.QUEUE
71+
@Bean
72+
public Binding demo02Binding() {
73+
return BindingBuilder.bind(demo02Queue()).to(demo02Exchange()).with(Demo01Message.ROUTING_KEY);
74+
}
75+
76+
77+
}
78+
4779
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo.consumer;
2+
3+
import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo02Message;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
7+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
8+
import org.springframework.stereotype.Component;
9+
10+
@Component
11+
@RabbitListener(queues = Demo02Message.QUEUE)
12+
public class Demo02Consumer {
13+
14+
private Logger logger = LoggerFactory.getLogger(getClass());
15+
16+
@RabbitHandler
17+
public void onMessage(Demo02Message message) {
18+
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
19+
}
20+
21+
}

lab-04/lab-04-rabbitmq-demo/src/main/java/cn/iocoder/springboot/lab04/rabbitmqdemo/message/Demo01Message.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public class Demo01Message implements Serializable {
88

99
public static final String EXCHANGE = "EXCHANGE_DEMO_01";
1010

11-
public static final String ROUTING_KEY = "ROUTING_KEY_01";
11+
public static final String ROUTING_KEY = "#.yu.nai";
1212

1313
/**
1414
* 编号
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo.message;
2+
3+
import java.io.Serializable;
4+
5+
public class Demo02Message implements Serializable {
6+
7+
public static final String QUEUE = "QUEUE_DEMO_02";
8+
9+
public static final String EXCHANGE = "EXCHANGE_DEMO_02";
10+
11+
public static final String ROUTING_KEY = "ROUTING_KEY_02";
12+
13+
/**
14+
* 编号
15+
*/
16+
private Integer id;
17+
18+
public Demo02Message setId(Integer id) {
19+
this.id = id;
20+
return this;
21+
}
22+
23+
public Integer getId() {
24+
return id;
25+
}
26+
27+
@Override
28+
public String toString() {
29+
return "Demo02Message{" +
30+
"id=" + id +
31+
'}';
32+
}
33+
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo.producer;
2+
3+
import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo02Message;
4+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.stereotype.Component;
7+
8+
@Component
9+
public class Demo02Producer {
10+
11+
@Autowired
12+
private RabbitTemplate rabbitTemplate;
13+
14+
public void syncSend(Integer id, String routingKey) {
15+
// 创建 Demo02Message 消息
16+
Demo02Message message = new Demo02Message();
17+
message.setId(id);
18+
// 同步发送消息
19+
rabbitTemplate.convertAndSend(Demo02Message.EXCHANGE, routingKey, message);
20+
}
21+
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo.producer;
2+
3+
import cn.iocoder.springboot.lab04.rabbitmqdemo.Application;
4+
import org.junit.Test;
5+
import org.junit.runner.RunWith;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.boot.test.context.SpringBootTest;
10+
import org.springframework.test.context.junit4.SpringRunner;
11+
12+
import java.util.concurrent.CountDownLatch;
13+
14+
@RunWith(SpringRunner.class)
15+
@SpringBootTest(classes = Application.class)
16+
public class Demo02ProducerTest {
17+
18+
private Logger logger = LoggerFactory.getLogger(getClass());
19+
20+
@Autowired
21+
private Demo02Producer producer;
22+
23+
@Test
24+
public void testSyncSendSuccess() throws InterruptedException {
25+
int id = (int) (System.currentTimeMillis() / 1000);
26+
producer.syncSend(id, "da.yu.nai");
27+
logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);
28+
29+
// 阻塞等待,保证消费
30+
new CountDownLatch(1).await();
31+
}
32+
33+
@Test
34+
public void testSyncSendFailure() throws InterruptedException {
35+
int id = (int) (System.currentTimeMillis() / 1000);
36+
producer.syncSend(id, "yu.nai.shuai");
37+
logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);
38+
39+
// 阻塞等待,保证消费
40+
new CountDownLatch(1).await();
41+
}
42+
43+
44+
}

0 commit comments

Comments
 (0)