Skip to content

Commit 0697ac9

Browse files
committed
springboot-rabbitmq complete
1 parent 98dd1fe commit 0697ac9

File tree

9 files changed

+477
-27
lines changed

9 files changed

+477
-27
lines changed

springboot-rabbitmq/README.md

Lines changed: 259 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,274 @@
1-
## SpringBoot整合RabbitMQ
2-
3-
RabbitMQ是基于AMQP的一款消息管理系统,RabbitMQ基于Erlang语言开发,安装之前需要先安装Erlang的相关依赖。
4-
1+
## SpringBoot整合RabbitMQ实现五种消息模型
52
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。
63

4+
- 基本消息模型:生产者-->队列-->消费者
5+
- work消息模型:生产者-->队列-->多个消费者共同消费
6+
- 订阅模型-Fanout:广播,将消息交给所有绑定到交换机的队列,每个消费者都会收到同一条消息
7+
- 订阅模型-Direct:定向,把消息交给符合指定 `rotingKey` 的队列
8+
- 订阅模型-Topic:通配符,把消息交给符合`routing pattern`(路由模式) 的队列
9+
710
但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
11+
![在这里插入图片描述](https://img-blog.csdnimg.cn/201904211443428.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4NzYyMjM3,size_16,color_FFFFFF,t_70)
12+
## 准备工作
13+
**相关软件的安装**
14+
15+
RabbitMQ官方教程:http://www.rabbitmq.com/getstarted.html
16+
RabbitMQ官网下载地址:http://www.rabbitmq.com/download.html
17+
Erlang下载地址:http://www.erlang.org/download.html
18+
RabbitMQ五种消息模型介绍:https://blog.csdn.net/qq_38762237/article/details/89416444
19+
20+
或者群文件夹下载,QQ群:957406675
21+
22+
**依赖**
23+
```xml
24+
<dependency>
25+
<groupId>org.springframework.boot</groupId>
26+
<artifactId>spring-boot-starter-amqp</artifactId>
27+
</dependency>
28+
```
29+
**application.yml**
30+
```yml
31+
spring:
32+
rabbitmq:
33+
host: 127.0.0.1
34+
username: tellsea
35+
password: 123456
36+
virtual-host: /tellsea-host
37+
```
38+
## simple消息模型
39+
40+
> Spring AMQP提供的‘template’扮演者关键的角色。定义主要操作的接口是AmqpTemplate。
41+
```java
42+
@Autowired
43+
private AmqpTemplate amqpTemplate;
44+
```
45+
46+
**发送消息**
47+
```java
48+
@Test
49+
public void simple() throws InterruptedException {
50+
String msg = "RabbitMQ simple ...";
51+
for (int i = 0; i < 10; i++) {
52+
amqpTemplate.convertAndSend("spring.simple.queue", msg + i);
53+
}
54+
Thread.sleep(5000);
55+
}
56+
```
57+
**接收消息**
58+
```java
59+
/**
60+
* simple:生产者-->队列-->消费者
61+
*/
62+
@Component
63+
public class SimpleListener {
64+
65+
// 通过注解自动创建 spring.simple.queue 队列
66+
@RabbitListener(queuesToDeclare = @Queue("spring.simple.queue"))
67+
public void listen(String msg) {
68+
System.out.println("SimpleListener listen 接收到消息:" + msg);
69+
}
70+
}
71+
```
72+
## work消息模型
73+
在刚才的基本模型中,一个生产者,一个消费者,生产的消息直接被消费者消费。比较简单。
874

9-
[RabbitMQ官网](http://www.rabbitmq.com/#)
10-
[RabbitMQ官方教程](http://www.rabbitmq.com/getstarted.html#)
11-
[RabbitMQ下载地址](http://www.rabbitmq.com/download.html#)
12-
[Erlang下载地址](http://www.erlang.org/download.html#)
13-
[RabbitMQ相关知识讲解](https://blog.csdn.net/qq_38762237/article/details/89416444#)
14-
[RabbitMQ在Windows的安装教程](https://blog.csdn.net/weixin_39735923/article/details/79288578#)
75+
Work queues,也被称为(Task queues),任务模型。
1576

16-
## 准备
77+
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:**让多个消费者绑定到一个队列,共同消费队列中的消息**。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
1778

18-
参照上面安装教程的讲解,安装好以下三个软件:
79+
**发送消息**
80+
```java
81+
@Test
82+
public void work() throws InterruptedException {
83+
String msg = "RabbitMQ simple ...";
84+
for (int i = 0; i < 10; i++) {
85+
amqpTemplate.convertAndSend("spring.work.queue", msg + i);
86+
}
87+
Thread.sleep(5000);
88+
}
89+
```
90+
**接收消息**
91+
```java
92+
@Component
93+
public class WorkListener {
1994

20-
- Erlang,由于是基于Erlang的中间件,所以必须安装,且配置环境变量
21-
- RabbitMQ,双击安装,默认的即可
22-
- sbin目录下安装图形界面:使用rabbitmq-plugins enable rabbitmq_management命令进行安装
95+
// 通过注解自动创建 spring.work.queue 队列
96+
@RabbitListener(queuesToDeclare = @Queue("spring.work.queue"))
97+
public void listen(String msg) {
98+
System.out.println("WorkListener listen 接收到消息:" + msg);
99+
}
23100

24-
运行sbin/rabbitmq-server
101+
// 创建两个队列共同消费
102+
@RabbitListener(queuesToDeclare = @Queue("spring.work.queue"))
103+
public void listen2(String msg) {
104+
System.out.println("WorkListener listen2 接收到消息:" + msg);
105+
}
106+
}
107+
```
108+
## 订阅模型-Fanout
109+
Fanout,也称为广播。在广播模式下,消息发送流程是这样的:
25110

26-
使用浏览器访问:http://localhost:15672,默认登录用户名:guest,密码为:guest
111+
- 1) 可以有多个消费者
112+
- 2) 每个**消费者有自己的queue**(队列)
113+
- 3) 每个**队列都要绑定到Exchange**(交换机)
114+
- 4) **生产者发送的消息,只能发送到交换机**,交换机来决定要发给哪个队列,生产者无法决定。
115+
- 5) 交换机把消息发送给绑定过的所有队列
116+
- 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
27117

28-
**添加用户**
118+
**发送消息**
119+
```java
120+
@Test
121+
public void fanout() throws InterruptedException {
122+
String msg = "RabbitMQ fanout ...";
123+
for (int i = 0; i < 10; i++) {
124+
// 这里注意细节,第二个参数需要写,不然第一个参数就变成routingKey了
125+
amqpTemplate.convertAndSend("spring.fanout.exchange", "", msg + i);
126+
}
127+
Thread.sleep(5000);
128+
}
129+
```
130+
**接收消息**
131+
```java
132+
/**
133+
* Fanout:广播,将消息交给所有绑定到交换机的队列,每个消费者都会收到同一条消息
134+
*/
135+
@Component
136+
public class FanoutListener {
29137

30-
admin模块下添加一个用户,建议和我使用一样的,避免之后产生混淆:
138+
@RabbitListener(bindings = @QueueBinding(
139+
value = @Queue(value = "spring.fanout.queue", durable = "true"),
140+
exchange = @Exchange(
141+
value = "spring.fanout.exchange",
142+
ignoreDeclarationExceptions = "true",
143+
type = ExchangeTypes.FANOUT
144+
)
145+
))
146+
public void listen(String msg) {
147+
System.out.println("FanoutListener listen 接收到消息:" + msg);
148+
}
31149

32-
|属性||
150+
// 队列2(第二个人),同样能接收到消息
151+
@RabbitListener(bindings = @QueueBinding(
152+
value = @Queue(value = "spring.fanout2.queue", durable = "true"),
153+
exchange = @Exchange(
154+
value = "spring.fanout.exchange",
155+
ignoreDeclarationExceptions = "true",
156+
type = ExchangeTypes.FANOUT
157+
)
158+
))
159+
public void listen2(String msg) {
160+
System.out.println("FanoutListener listen2 接收到消息:" + msg);
161+
}
162+
}
163+
```
164+
## 订阅模型-Direct
165+
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
166+
167+
在Direct模型下:
168+
169+
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个`RoutingKey`(路由key)
170+
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 `RoutingKey`
171+
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的`Routing Key`进行判断,只有队列的`Routingkey`与消息的 `Routing key`完全一致,才会接收到消息
172+
173+
**发送消息**
174+
```java
175+
@Test
176+
public void direct() throws InterruptedException {
177+
String msg = "RabbitMQ direct ...";
178+
for (int i = 0; i < 10; i++) {
179+
amqpTemplate.convertAndSend("spring.direct.exchange", "direct", msg + i);
180+
}
181+
Thread.sleep(5000);
182+
}
183+
```
184+
**接收消息**
185+
```java
186+
/**
187+
* Direct:定向,把消息交给符合指定routing key 的队列
188+
*/
189+
@Component
190+
public class DirectListener {
191+
192+
@RabbitListener(bindings = @QueueBinding(
193+
value = @Queue(value = "spring.direct.queue", durable = "true"),
194+
exchange = @Exchange(
195+
value = "spring.direct.exchange",
196+
ignoreDeclarationExceptions = "true"
197+
),
198+
key = {"direct"}
199+
))
200+
public void listen(String msg) {
201+
System.out.println("DirectListener listen 接收到消息:" + msg);
202+
}
203+
204+
// 队列2(第二个人),key值不同,接收不到消息
205+
@RabbitListener(bindings = @QueueBinding(
206+
value = @Queue(value = "spring.direct2.queue", durable = "true"),
207+
exchange = @Exchange(
208+
value = "spring.direct.exchange",
209+
ignoreDeclarationExceptions = "true"
210+
),
211+
key = {"direct-test"}
212+
))
213+
public void listen2(String msg) {
214+
System.out.println("DirectListener listen2 接收到消息:" + msg);
215+
}
216+
}
217+
```
218+
## 订阅模型-Topic
219+
`Topic`类型的`Exchange``Direct`相比,都是可以根据`RoutingKey`把消息路由到不同的队列。只不过`Topic`类型`Exchange`可以让队列在绑定`Routing key` 的时候使用通配符!
220+
221+
`Routingkey` 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: `user.insert`
222+
223+
|通配符规则 |举例 |
33224
|:--|:--|
34-
|Username|tellsea|
35-
|password|123456|
36-
Tags|administrator|
225+
| `#`:匹配一个或多个词 | `user.#`:能够匹配`user.insert.save` 或者 `user.insert` |
226+
| `*`:匹配不多不少恰好1个词 | `user.*`:只能匹配`user.insert` |
227+
228+
**发送消息**
229+
```java
230+
@Test
231+
public void topic() throws InterruptedException {
232+
amqpTemplate.convertAndSend("spring.topic.exchange", "user.insert", "新增用户");
233+
amqpTemplate.convertAndSend("spring.topic.exchange", "user.delete", "删除用户");
234+
amqpTemplate.convertAndSend("spring.topic.exchange", "student.insert", "新增学生");
235+
amqpTemplate.convertAndSend("spring.topic.exchange", "student.delete", "删除学生");
236+
Thread.sleep(5000);
237+
}
238+
```
239+
**接收消息**
240+
```java
241+
/**
242+
* Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
243+
*/
244+
@Component
245+
public class TopicListener {
37246

38-
**创建虚拟主机**
247+
@RabbitListener(bindings = @QueueBinding(
248+
value = @Queue(value = "spring.topic.queue", durable = "true"),
249+
exchange = @Exchange(
250+
value = "spring.topic.exchange",
251+
ignoreDeclarationExceptions = "true",
252+
type = ExchangeTypes.TOPIC
253+
),
254+
key = {"user.*"}
255+
))
256+
public void listen(String msg) {
257+
System.out.println("TopicListener User 接收到消息:" + msg);
258+
}
39259

40-
Virtual Hosts名称设置为:/tellsea-host,点击主机名称授权用户tellsea
260+
// 通配规则不同,接收不到消息
261+
@RabbitListener(bindings = @QueueBinding(
262+
value = @Queue(value = "spring.topic.queue", durable = "true"),
263+
exchange = @Exchange(
264+
value = "spring.topic.exchange",
265+
ignoreDeclarationExceptions = "true",
266+
type = ExchangeTypes.TOPIC
267+
),
268+
key = {"student.*"}
269+
))
270+
public void listen2(String msg) {
271+
System.out.println("TopicListener Student 接收到消息:" + msg);
272+
}
273+
}
274+
```
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package cn.tellsea.mq;
2+
3+
import org.springframework.amqp.rabbit.annotation.Exchange;
4+
import org.springframework.amqp.rabbit.annotation.Queue;
5+
import org.springframework.amqp.rabbit.annotation.QueueBinding;
6+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
7+
import org.springframework.stereotype.Component;
8+
9+
/**
10+
* Direct:定向,把消息交给符合指定routing key 的队列
11+
*/
12+
@Component
13+
public class DirectListener {
14+
15+
@RabbitListener(bindings = @QueueBinding(
16+
value = @Queue(value = "spring.direct.queue", durable = "true"),
17+
exchange = @Exchange(
18+
value = "spring.direct.exchange",
19+
ignoreDeclarationExceptions = "true"
20+
),
21+
key = {"direct"}
22+
))
23+
public void listen(String msg) {
24+
System.out.println("DirectListener listen 接收到消息:" + msg);
25+
}
26+
27+
// 队列2(第二个人),key值不同,接收不到消息
28+
@RabbitListener(bindings = @QueueBinding(
29+
value = @Queue(value = "spring.direct2.queue", durable = "true"),
30+
exchange = @Exchange(
31+
value = "spring.direct.exchange",
32+
ignoreDeclarationExceptions = "true"
33+
),
34+
key = {"direct-test"}
35+
))
36+
public void listen2(String msg) {
37+
System.out.println("DirectListener listen2 接收到消息:" + msg);
38+
}
39+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package cn.tellsea.mq;
2+
3+
import org.springframework.amqp.core.ExchangeTypes;
4+
import org.springframework.amqp.rabbit.annotation.Exchange;
5+
import org.springframework.amqp.rabbit.annotation.Queue;
6+
import org.springframework.amqp.rabbit.annotation.QueueBinding;
7+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
8+
import org.springframework.stereotype.Component;
9+
10+
/**
11+
* Fanout:广播,将消息交给所有绑定到交换机的队列,每个消费者都会收到同一条消息
12+
*/
13+
@Component
14+
public class FanoutListener {
15+
16+
@RabbitListener(bindings = @QueueBinding(
17+
value = @Queue(value = "spring.fanout.queue", durable = "true"),
18+
exchange = @Exchange(
19+
value = "spring.fanout.exchange",
20+
ignoreDeclarationExceptions = "true",
21+
type = ExchangeTypes.FANOUT
22+
)
23+
))
24+
public void listen(String msg) {
25+
System.out.println("FanoutListener listen 接收到消息:" + msg);
26+
}
27+
28+
// 队列2(第二个人),同样能接收到消息
29+
@RabbitListener(bindings = @QueueBinding(
30+
value = @Queue(value = "spring.fanout2.queue", durable = "true"),
31+
exchange = @Exchange(
32+
value = "spring.fanout.exchange",
33+
ignoreDeclarationExceptions = "true",
34+
type = ExchangeTypes.FANOUT
35+
)
36+
))
37+
public void listen2(String msg) {
38+
System.out.println("FanoutListener listen2 接收到消息:" + msg);
39+
}
40+
}

0 commit comments

Comments
 (0)