Skip to content

Commit

Permalink
增加 spring cloud stream rabbitmq 示例(消息确认,完成乱炖)
Browse files Browse the repository at this point in the history
  • Loading branch information
YunaiV committed Mar 7, 2020
1 parent fd39615 commit 231102c
Show file tree
Hide file tree
Showing 13 changed files with 363 additions and 0 deletions.
58 changes: 58 additions & 0 deletions labx-10/labx-10-sc-stream-rabbitmq-consumer-batch/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>labx-10</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>labx-10-sc-stream-rabbitmq-consumer-batch</artifactId>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
</properties>

<!--
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 引入 Spring Cloud Stream RabbitMQ 相关依赖,将 RabbitMQ 作为消息队列,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo;

import cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo.listener.MySink;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo.listener;

import cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo.message.Demo01Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo.listener;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {

String DEMO01_INPUT = "demo01-input";

@Input(DEMO01_INPUT)
SubscribableChannel demo01Input();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo.message;

/**
* 示例 01 的 Message 消息
*/
public class Demo01Message {

/**
* 编号
*/
private Integer id;

public Demo01Message setId(Integer id) {
this.id = id;
return this;
}

public Integer getId() {
return id;
}

@Override
public String toString() {
return "Demo01Message{" +
"id=" + id +
'}';
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-04 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
binder: rabbit001 # 设置使用的 Binder 名字

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者
58 changes: 58 additions & 0 deletions labx-10/labx-10-sc-stream-rabbitmq-producer-batch/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>labx-10</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>labx-10-sc-stream-rabbitmq-producer-batch</artifactId>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
</properties>

<!--
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 引入 Spring Cloud Stream RabbitMQ 相关依赖,将 RabbitMQ 作为消息队列,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo;

import cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo.message.MySource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {

public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo.controller;

import cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo.message.Demo01Message;
import cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo.message.MySource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private MySource mySource;

@GetMapping("/send_batch")
public boolean send() throws InterruptedException {
// 发送 3 条消息,每条中间间隔 10 秒
for (int i = 0; i < 3; i++) {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// 发送消息
mySource.demo01Output().send(springMessage);

// 故意每条消息之间,隔离 10 秒
logger.info("[testSyncSend][发送编号:[{}] 发送成功]", message.getId());
Thread.sleep(10 * 1000L);
}
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo.message;

/**
* 示例 01 的 Message 消息
*/
public class Demo01Message {

/**
* 编号
*/
private Integer id;

public Demo01Message setId(Integer id) {
this.id = id;
return this;
}

public Integer getId() {
return id;
}

@Override
public String toString() {
return "Demo01Message{" +
"id=" + id +
'}';
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {

@Output("demo01-output")
MessageChannel demo01Output();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
binder: rabbit001 # 设置使用的 Binder 名字
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
rabbit:
bindings:
demo01-output:
# RabbitMQ Producer 配置项,对应 RabbitProducerProperties 类
producer:
batching-enabled: true # 是否开启批量发送功能,默认为 false
batch-size: 100 # 超过收集的消息数量的最大条数,默认为 100
batch-buffer-limit: 10000 # 每次批量发送消息的最大内存,默认为 10000
batch-timeout: 30000 # 超过收集的时间的最大等待时长,单位:毫秒,默认为 5000

server:
port: 18080
3 changes: 3 additions & 0 deletions labx-10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@

<module>labx-10-sc-stream-rabbitmq-producer-confirm</module>
<!--搭配 <module>labx-10-sc-stream-rabbitmq-consumer-demo</module> 作为消费者-->

<module>labx-10-sc-stream-rabbitmq-producer-batch</module>
<module>labx-10-sc-stream-rabbitmq-consumer-batch</module>
</modules>


Expand Down

0 comments on commit 231102c

Please sign in to comment.