Skip to content

Commit 7e9a6eb

Browse files
author
YunaiV
committed
增加 rabbitmq 消费重试
1 parent f311040 commit 7e9a6eb

File tree

10 files changed

+236
-0
lines changed

10 files changed

+236
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<groupId>org.springframework.boot</groupId>
7+
<artifactId>spring-boot-starter-parent</artifactId>
8+
<version>2.2.1.RELEASE</version>
9+
<relativePath/> <!-- lookup parent from repository -->
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>lab-04-rabbitmq-consume-retry</artifactId>
14+
15+
<dependencies>
16+
<!-- 实现对 RabbitMQ 的自动化配置 -->
17+
<dependency>
18+
<groupId>org.springframework.boot</groupId>
19+
<artifactId>spring-boot-starter-amqp</artifactId>
20+
</dependency>
21+
22+
<!-- 方便等会写单元测试 -->
23+
<dependency>
24+
<groupId>org.springframework.boot</groupId>
25+
<artifactId>spring-boot-starter-test</artifactId>
26+
<scope>test</scope>
27+
</dependency>
28+
</dependencies>
29+
30+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class Application {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(Application.class, args);
11+
}
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo.config;
2+
3+
import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo07Message;
4+
import org.springframework.amqp.core.*;
5+
import org.springframework.context.annotation.Bean;
6+
import org.springframework.context.annotation.Configuration;
7+
8+
@Configuration
9+
public class RabbitConfig {
10+
11+
/**
12+
* Direct Exchange 示例的配置类
13+
*/
14+
public static class DirectExchangeDemoConfiguration {
15+
16+
// 创建 Queue
17+
@Bean
18+
public Queue demo07Queue() {
19+
return QueueBuilder.durable(Demo07Message.QUEUE) // durable: 是否持久化
20+
.exclusive() // exclusive: 是否排它
21+
.autoDelete() // autoDelete: 是否自动删除
22+
.deadLetterExchange(Demo07Message.EXCHANGE)
23+
.deadLetterRoutingKey(Demo07Message.DEAD_ROUTING_KEY)
24+
.build();
25+
}
26+
27+
// 创建 Dead Queue
28+
@Bean
29+
public Queue demo07DeadQueue() {
30+
return new Queue(Demo07Message.DEAD_QUEUE, // Queue 名字
31+
true, // durable: 是否持久化
32+
false, // exclusive: 是否排它
33+
false); // autoDelete: 是否自动删除
34+
}
35+
36+
// 创建 Direct Exchange
37+
@Bean
38+
public DirectExchange demo07Exchange() {
39+
return new DirectExchange(Demo07Message.EXCHANGE,
40+
true, // durable: 是否持久化
41+
false); // exclusive: 是否排它
42+
}
43+
44+
// 创建 Binding
45+
// Exchange:Demo07Message.EXCHANGE
46+
// Routing key:Demo07Message.ROUTING_KEY
47+
// Queue:Demo07Message.QUEUE
48+
@Bean
49+
public Binding demo07Binding() {
50+
return BindingBuilder.bind(demo07Queue()).to(demo07Exchange()).with(Demo07Message.ROUTING_KEY);
51+
}
52+
53+
// 创建 Dead Binding
54+
// Exchange:Demo07Message.EXCHANGE
55+
// Routing key:Demo07Message.DEAD_ROUTING_KEY
56+
// Queue:Demo07Message.DEAD_QUEUE
57+
@Bean
58+
public Binding demo07DeadBinding() {
59+
return BindingBuilder.bind(demo07DeadQueue()).to(demo07Exchange()).with(Demo07Message.DEAD_ROUTING_KEY);
60+
}
61+
62+
}
63+
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo.consumer;
2+
3+
import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo07Message;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
7+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
8+
import org.springframework.stereotype.Component;
9+
10+
@Component
11+
@RabbitListener(queues = Demo07Message.QUEUE)
12+
public class Demo07Consumer {
13+
14+
private Logger logger = LoggerFactory.getLogger(getClass());
15+
16+
@RabbitHandler
17+
public void onMessage(Demo07Message message) {
18+
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
19+
// <X> 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
20+
throw new RuntimeException("我就是故意抛出一个异常");
21+
}
22+
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo.message;
2+
3+
import java.io.Serializable;
4+
5+
public class Demo07Message implements Serializable {
6+
7+
public static final String QUEUE = "QUEUE_DEMO_07"; // 正常队列
8+
public static final String DEAD_QUEUE = "DEAD_QUEUE_DEMO_07"; // 死信队列
9+
10+
public static final String EXCHANGE = "EXCHANGE_DEMO_07";
11+
12+
public static final String ROUTING_KEY = "ROUTING_KEY_07"; // 正常路由键
13+
public static final String DEAD_ROUTING_KEY = "DEAD_ROUTING_KEY_07"; // 死信路由键
14+
15+
16+
/**
17+
* 编号
18+
*/
19+
private Integer id;
20+
21+
public Demo07Message setId(Integer id) {
22+
this.id = id;
23+
return this;
24+
}
25+
26+
public Integer getId() {
27+
return id;
28+
}
29+
30+
@Override
31+
public String toString() {
32+
return "Demo07Message{" +
33+
"id=" + id +
34+
'}';
35+
}
36+
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo.producer;
2+
3+
import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo07Message;
4+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.stereotype.Component;
7+
8+
@Component
9+
public class Demo07Producer {
10+
11+
@Autowired
12+
private RabbitTemplate rabbitTemplate;
13+
14+
public void syncSend(Integer id) {
15+
// 创建 Demo07Message 消息
16+
Demo07Message message = new Demo07Message();
17+
message.setId(id);
18+
// 同步发送消息
19+
rabbitTemplate.convertAndSend(Demo07Message.EXCHANGE, Demo07Message.ROUTING_KEY, message);
20+
}
21+
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
spring:
2+
# RabbitMQ 配置项,对应 RabbitProperties 配置类
3+
rabbitmq:
4+
host: 127.0.0.1 # RabbitMQ 服务的地址
5+
port: 5672 # RabbitMQ 服务的端口
6+
username: guest # RabbitMQ 服务的账号
7+
password: guest # RabbitMQ 服务的密码
8+
listener:
9+
simple:
10+
retry:
11+
enabled: true
12+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package cn.iocoder.springboot.lab04.rabbitmqdemo.producer;
2+
3+
import cn.iocoder.springboot.lab04.rabbitmqdemo.Application;
4+
import org.junit.Test;
5+
import org.junit.runner.RunWith;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.boot.test.context.SpringBootTest;
10+
import org.springframework.test.context.junit4.SpringRunner;
11+
12+
import java.util.concurrent.CountDownLatch;
13+
14+
@RunWith(SpringRunner.class)
15+
@SpringBootTest(classes = Application.class)
16+
public class Demo07ProducerTest {
17+
18+
private Logger logger = LoggerFactory.getLogger(getClass());
19+
20+
@Autowired
21+
private Demo07Producer producer;
22+
23+
@Test
24+
public void testSyncSend() throws InterruptedException {
25+
int id = (int) (System.currentTimeMillis() / 1000);
26+
producer.syncSend(id);
27+
logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);
28+
29+
// 阻塞等待,保证消费
30+
new CountDownLatch(1).await();
31+
}
32+
33+
}

lab-04/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
<module>lab-04-rabbitmq-demo-batch</module>
1818
<module>lab-04-rabbitmq-demo-batch-consume</module>
1919
<module>lab-04-rabbitmq-demo-batch-consume-02</module>
20+
<module>lab-04-rabbitmq-consume-retry</module>
2021
</modules>
2122

2223
<dependencies>

0 commit comments

Comments
 (0)