Skip to content

Commit f9b6a6a

Browse files
author
YunaiV
committed
增加 spring cloud stream kafka 示例
1 parent ba4fa4f commit f9b6a6a

File tree

15 files changed

+369
-13
lines changed

15 files changed

+369
-13
lines changed

README.md

+23-13
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* [《Spring Cloud Alibaba 专栏》](https://github.com/YunaiV/SpringBoot-Labs#spring-cloud-alibaba-%E4%B8%93%E6%A0%8F)
44
* [《Spring Cloud 专栏》](https://github.com/YunaiV/SpringBoot-Labs#spring-cloud-%E4%B8%93%E6%A0%8F)
55
* [《Dubbo 专栏》](https://github.com/YunaiV/SpringBoot-Labs#Dubbo-%E4%B8%93%E6%A0%8F)
6+
* [《消息队列 MQ 专栏》](TODO)
67

78
作为一个热爱**深夜**撸码的 18 岁头发茂密的可爱小男孩,希望大佬能够**一键三连**
89

@@ -43,6 +44,8 @@
4344
* [《芋道 Spring Boot API 接口文档 Swagger 入门》](http://www.iocoder.cn/Spring-Boot/Swagger/?github) 对应 [lab-24](https://github.com/YunaiV/SpringBoot-Labs/tree/master/lab-24)
4445
* [《芋道 Spring Boot 参数校验 Validation 入门》](http://www.iocoder.cn/Spring-Boot/Validation/?github) 对应 [lab-22](https://github.com/YunaiV/SpringBoot-Labs/tree/master/lab-22)
4546
* [《芋道 Spring Boot WebSocket 入门》](http://www.iocoder.cn/Spring-Boot/WebSocket/?github) 对应 [lab-25](https://github.com/YunaiV/SpringBoot-Labs/tree/master/lab-25)
47+
* [《性能测试 —— Tomcat、Jetty、Undertow 基准测试》](http://www.iocoder.cn/Performance-Testing/Tomcat-Jetty-Undertow-benchmark/?github) 对应 [lab-05](https://github.com/YunaiV/SpringBoot-Labs/tree/master/lab-05)
48+
* [《性能测试 —— SpringMVC、Webflux 基准测试》](http://www.iocoder.cn/Performance-Testing/SpringMVC-Webflux-benchmark/?github) 对应 [lab-06](https://github.com/YunaiV/SpringBoot-Labs/tree/master/lab-06)
4649

4750
## RPC 开发
4851

@@ -125,12 +128,6 @@
125128
* 《芋道 Spring Boot 链路追踪 Pinpoint 入门》计划中...
126129
* 《芋道 Spring Boot 链路追踪 Elastic APM 入门》计划中...
127130

128-
## 性能测试
129-
130-
* [《性能测试 —— Tomcat、Jetty、Undertow 基准测试》](http://www.iocoder.cn/Performance-Testing/Tomcat-Jetty-Undertow-benchmark/?github) 对应 [lab-05](https://github.com/YunaiV/SpringBoot-Labs/tree/master/lab-05)
131-
* [《性能测试 —— SpringMVC、Webflux 基准测试》](http://www.iocoder.cn/Performance-Testing/SpringMVC-Webflux-benchmark/?github) 对应 [lab-06](https://github.com/YunaiV/SpringBoot-Labs/tree/master/lab-06)
132-
* [《性能测试 —— Spring Cloud Gateway、Zuul 基准测试》](http://www.iocoder.cn/Performance-Testing/SpringCloudGateway-Zuul-benchmark/?github) 对应 [lab-07](https://github.com/YunaiV/SpringBoot-Labs/tree/master/lab-07)
133-
134131
# Spring Cloud Alibaba 专栏
135132

136133
## Spring Cloud Alibaba 全家桶
@@ -172,18 +169,19 @@
172169

173170
* [《芋道 Spring Cloud 服务网关 Spring Cloud Gateway 入门》](http://www.iocoder.cn/Spring-Cloud/Spring-Cloud-Gateway/?github) 对应 [labx-08](https://github.com/YunaiV/SpringBoot-Labs/tree/master/labx-08)
174171
* 《芋道 Spring Cloud Netflix 服务网关 Zuul 入门》
172+
* [《性能测试 —— Spring Cloud Gateway、Zuul 基准测试》](http://www.iocoder.cn/Performance-Testing/SpringCloudGateway-Zuul-benchmark/?github) 对应 [lab-07](https://github.com/YunaiV/SpringBoot-Labs/tree/master/lab-07)
175173

176174
## 配置中心
177175

178176
* [《芋道 Spring Cloud Alibaba 配置中心 Nacos 入门》](http://www.iocoder.cn/Spring-Cloud-Alibaba/Nacos-Config/?github) 对应 [labx-05](https://github.com/YunaiV/SpringBoot-Labs/tree/master/labx-05)
179177
* [《芋道 Spring Cloud 配置中心 Apollo 入门》](http://www.iocoder.cn/Spring-Cloud/Apollo/?github) 对应 [labx-09](https://github.com/YunaiV/SpringBoot-Labs/tree/master/labx-09)
180178

181-
182179
## 消息队列
183180

184181
* [《芋道 Spring Cloud Alibaba 消息队列 RocketMQ 入门》](http://www.iocoder.cn/Spring-Cloud-Alibaba/RocketMQ/?github) 对应 [labx-06](https://github.com/YunaiV/SpringBoot-Labs/tree/master/labx-06)
185-
* [《芋道 Spring Cloud 消息队列 RabbitMQ 入门》](http://www.iocoder.cn/Spring-Cloud/RabbitMQ/?github) 对应 [lab10-06](https://github.com/YunaiV/SpringBoot-Labs/tree/master/labx-10)
186-
* 《芋道 Spring Cloud 消息队列 Kafka 入门》
182+
* [《芋道 Spring Cloud 消息队列 RabbitMQ 入门》](http://www.iocoder.cn/Spring-Cloud/RabbitMQ/?github) 对应 [labx-10](https://github.com/YunaiV/SpringBoot-Labs/tree/master/labx-10)
183+
* [《芋道 Spring Cloud 消息队列 Kafka 入门》](http://www.iocoder.cn/Spring-Cloud/Kafka/?github) 对应 [labx-11](https://github.com/YunaiV/SpringBoot-Labs/tree/master/labx-11)
184+
* 《芋道 Spring Cloud 消息队列 ActiveMQ 入门》
187185

188186
## 链路追踪
189187

@@ -195,15 +193,27 @@
195193
* [《芋道 Spring Boot Dubbo 入门》](http://www.iocoder.cn/Spring-Boot/Dubbo/?github) 对应 [lab-30](https://github.com/YunaiV/SpringBoot-Labs/tree/master/lab-30)
196194
* [《芋道 Spring Cloud Alibaba 服务调用 Dubbo 入门》](http://www.iocoder.cn/Spring-Cloud-Alibaba/Dubbo/?github) 对应 [labx-07](https://github.com/YunaiV/SpringBoot-Labs/tree/master/labx-07)
197195

198-
---------
196+
# 消息队列 MQ 专栏
199197

200-
如下是草稿目录,未来需要整理下
198+
## RocketMQ
201199

202-
# lab-05
200+
TODO
201+
202+
## RabbitMQ
203+
204+
TODO
205+
206+
## Kafka
203207

204208
TODO
205209

206-
WEB 容器,后续优化掉
210+
## ActiveMQ
211+
212+
TODO
213+
214+
---------
215+
216+
如下是草稿目录,未来需要整理下
207217

208218
# lab-08
209219

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>labx-10</artifactId>
7+
<groupId>cn.iocoder.springboot.labs</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>labx-11-sc-stream-kafka-consumer-demo</artifactId>
13+
14+
<properties>
15+
<maven.compiler.target>1.8</maven.compiler.target>
16+
<maven.compiler.source>1.8</maven.compiler.source>
17+
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
18+
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
19+
</properties>
20+
21+
<!--
22+
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
23+
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
24+
-->
25+
<dependencyManagement>
26+
<dependencies>
27+
<dependency>
28+
<groupId>org.springframework.boot</groupId>
29+
<artifactId>spring-boot-starter-parent</artifactId>
30+
<version>${spring.boot.version}</version>
31+
<type>pom</type>
32+
<scope>import</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.springframework.cloud</groupId>
36+
<artifactId>spring-cloud-dependencies</artifactId>
37+
<version>${spring.cloud.version}</version>
38+
<type>pom</type>
39+
<scope>import</scope>
40+
</dependency>
41+
</dependencies>
42+
</dependencyManagement>
43+
44+
<dependencies>
45+
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
46+
<dependency>
47+
<groupId>org.springframework.boot</groupId>
48+
<artifactId>spring-boot-starter-web</artifactId>
49+
</dependency>
50+
51+
<!-- 引入 Spring Cloud Stream Kafka 相关依赖,将 Kafka 作为消息队列,并实现对其的自动配置 -->
52+
<dependency>
53+
<groupId>org.springframework.cloud</groupId>
54+
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
55+
</dependency>
56+
</dependencies>
57+
58+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package cn.iocoder.springcloud.labx11.kafkademo.consumerdemo;
2+
3+
import cn.iocoder.springcloud.labx11.kafkademo.consumerdemo.listener.MySink;
4+
import org.springframework.boot.SpringApplication;
5+
import org.springframework.boot.autoconfigure.SpringBootApplication;
6+
import org.springframework.cloud.stream.annotation.EnableBinding;
7+
8+
@SpringBootApplication
9+
@EnableBinding(MySink.class)
10+
public class ConsumerApplication {
11+
12+
public static void main(String[] args) {
13+
SpringApplication.run(ConsumerApplication.class, args);
14+
}
15+
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package cn.iocoder.springcloud.labx11.kafkademo.consumerdemo.listener;
2+
3+
import cn.iocoder.springcloud.labx11.kafkademo.consumerdemo.message.Demo01Message;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.cloud.stream.annotation.StreamListener;
7+
import org.springframework.messaging.handler.annotation.Payload;
8+
import org.springframework.stereotype.Component;
9+
10+
@Component
11+
public class Demo01Consumer {
12+
13+
private Logger logger = LoggerFactory.getLogger(getClass());
14+
15+
@StreamListener(MySink.DEMO01_INPUT)
16+
public void onMessage(@Payload Demo01Message message) {
17+
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
18+
}
19+
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package cn.iocoder.springcloud.labx11.kafkademo.consumerdemo.listener;
2+
3+
import org.springframework.cloud.stream.annotation.Input;
4+
import org.springframework.messaging.SubscribableChannel;
5+
6+
public interface MySink {
7+
8+
String DEMO01_INPUT = "demo01-input";
9+
10+
@Input(DEMO01_INPUT)
11+
SubscribableChannel demo01Input();
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package cn.iocoder.springcloud.labx11.kafkademo.consumerdemo.message;
2+
3+
/**
4+
* 示例 01 的 Message 消息
5+
*/
6+
public class Demo01Message {
7+
8+
/**
9+
* 编号
10+
*/
11+
private Integer id;
12+
13+
public Demo01Message setId(Integer id) {
14+
this.id = id;
15+
return this;
16+
}
17+
18+
public Integer getId() {
19+
return id;
20+
}
21+
22+
@Override
23+
public String toString() {
24+
return "Demo01Message{" +
25+
"id=" + id +
26+
'}';
27+
}
28+
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
spring:
2+
application:
3+
name: demo-consumer-application
4+
cloud:
5+
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
6+
stream:
7+
# Binder 配置项,对应 BinderProperties Map
8+
# binders:
9+
# Binding 配置项,对应 BindingProperties Map
10+
bindings:
11+
demo01-input:
12+
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
13+
content-type: application/json # 内容格式。这里使用 JSON
14+
group: demo01-consumer-group # 消费者分组
15+
kafka:
16+
binder:
17+
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
18+
19+
server:
20+
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>labx-10</artifactId>
7+
<groupId>cn.iocoder.springboot.labs</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>labx-11-sc-stream-kafka-producer-demo</artifactId>
13+
14+
<properties>
15+
<maven.compiler.target>1.8</maven.compiler.target>
16+
<maven.compiler.source>1.8</maven.compiler.source>
17+
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
18+
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
19+
</properties>
20+
21+
<!--
22+
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
23+
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
24+
-->
25+
<dependencyManagement>
26+
<dependencies>
27+
<dependency>
28+
<groupId>org.springframework.boot</groupId>
29+
<artifactId>spring-boot-starter-parent</artifactId>
30+
<version>${spring.boot.version}</version>
31+
<type>pom</type>
32+
<scope>import</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.springframework.cloud</groupId>
36+
<artifactId>spring-cloud-dependencies</artifactId>
37+
<version>${spring.cloud.version}</version>
38+
<type>pom</type>
39+
<scope>import</scope>
40+
</dependency>
41+
</dependencies>
42+
</dependencyManagement>
43+
44+
<dependencies>
45+
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
46+
<dependency>
47+
<groupId>org.springframework.boot</groupId>
48+
<artifactId>spring-boot-starter-web</artifactId>
49+
</dependency>
50+
51+
<!-- 引入 Spring Cloud Stream Kafka 相关依赖,将 Kafka 作为消息队列,并实现对其的自动配置 -->
52+
<dependency>
53+
<groupId>org.springframework.cloud</groupId>
54+
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
55+
</dependency>
56+
</dependencies>
57+
58+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package cn.iocoder.springcloud.labx11.kafkademo.kafkademo;
2+
3+
import cn.iocoder.springcloud.labx11.kafkademo.kafkademo.message.MySource;
4+
import org.springframework.boot.SpringApplication;
5+
import org.springframework.boot.autoconfigure.SpringBootApplication;
6+
import org.springframework.cloud.stream.annotation.EnableBinding;
7+
8+
@SpringBootApplication
9+
@EnableBinding(MySource.class)
10+
public class ProducerApplication {
11+
12+
public static void main(String[] args) {
13+
SpringApplication.run(ProducerApplication.class, args);
14+
}
15+
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package cn.iocoder.springcloud.labx11.kafkademo.kafkademo.controller;
2+
3+
import cn.iocoder.springcloud.labx11.kafkademo.kafkademo.message.Demo01Message;
4+
import cn.iocoder.springcloud.labx11.kafkademo.kafkademo.message.MySource;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.messaging.Message;
9+
import org.springframework.messaging.support.MessageBuilder;
10+
import org.springframework.web.bind.annotation.GetMapping;
11+
import org.springframework.web.bind.annotation.RequestMapping;
12+
import org.springframework.web.bind.annotation.RestController;
13+
14+
import java.util.Random;
15+
16+
@RestController
17+
@RequestMapping("/demo01")
18+
public class Demo01Controller {
19+
20+
private Logger logger = LoggerFactory.getLogger(getClass());
21+
22+
@Autowired
23+
private MySource mySource;
24+
25+
@GetMapping("/send")
26+
public boolean send() {
27+
// 创建 Message
28+
Demo01Message message = new Demo01Message()
29+
.setId(new Random().nextInt());
30+
// 创建 Spring Message 对象
31+
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
32+
.build();
33+
// 发送消息
34+
return mySource.demo01Output().send(springMessage);
35+
}
36+
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package cn.iocoder.springcloud.labx11.kafkademo.kafkademo.message;
2+
3+
/**
4+
* 示例 01 的 Message 消息
5+
*/
6+
public class Demo01Message {
7+
8+
/**
9+
* 编号
10+
*/
11+
private Integer id;
12+
13+
public Demo01Message setId(Integer id) {
14+
this.id = id;
15+
return this;
16+
}
17+
18+
public Integer getId() {
19+
return id;
20+
}
21+
22+
@Override
23+
public String toString() {
24+
return "Demo01Message{" +
25+
"id=" + id +
26+
'}';
27+
}
28+
29+
}

0 commit comments

Comments
 (0)