Skip to content

Commit 85271f8

Browse files
author
YunaiV
committed
增加 spring boot kafka 示例
1 parent 6e94334 commit 85271f8

File tree

13 files changed

+92
-290
lines changed

13 files changed

+92
-290
lines changed

lab-03/lab-03-kafka-demo-transaction/src/main/java/cn/iocoder/springboot/lab03/kafkademo/config/KafkaConfiguration.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

lab-03/lab-03-kafka-demo-transaction/src/main/java/cn/iocoder/springboot/lab03/kafkademo/consumer/Demo01AConsumer.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

lab-03/lab-03-kafka-demo-transaction/src/main/java/cn/iocoder/springboot/lab03/kafkademo/consumer/Demo04Consumer.java

Lines changed: 0 additions & 26 deletions
This file was deleted.
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package cn.iocoder.springboot.lab03.kafkademo.consumer;
22

3-
import cn.iocoder.springboot.lab03.kafkademo.message.Demo01Message;
3+
import cn.iocoder.springboot.lab03.kafkademo.message.Demo07Message;
44
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
66
import org.springframework.kafka.annotation.KafkaListener;
77
import org.springframework.stereotype.Component;
88

99
@Component
10-
public class Demo01Consumer {
10+
public class Demo07Consumer {
1111

1212
private Logger logger = LoggerFactory.getLogger(getClass());
1313

14-
@KafkaListener(topics = Demo01Message.TOPIC,
15-
groupId = "demo01-consumer-group-" + Demo01Message.TOPIC)
16-
public void onMessage(Demo01Message message) {
14+
@KafkaListener(topics = Demo07Message.TOPIC,
15+
groupId = "demo07-consumer-group-" + Demo07Message.TOPIC)
16+
public void onMessage(Demo07Message message) {
1717
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
1818
}
1919

lab-03/lab-03-kafka-demo-transaction/src/main/java/cn/iocoder/springboot/lab03/kafkademo/message/Demo04Message.java

Lines changed: 0 additions & 31 deletions
This file was deleted.
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package cn.iocoder.springboot.lab03.kafkademo.message;
22

33
/**
4-
* 示例 01 的 Message 消息
4+
* 示例 07 的 Message 消息
55
*/
6-
public class Demo01Message {
6+
public class Demo07Message {
77

8-
public static final String TOPIC = "DEMO_01";
8+
public static final String TOPIC = "DEMO_07";
99

1010
/**
1111
* 编号
1212
*/
1313
private Integer id;
1414

15-
public Demo01Message setId(Integer id) {
15+
public Demo07Message setId(Integer id) {
1616
this.id = id;
1717
return this;
1818
}
@@ -23,7 +23,7 @@ public Integer getId() {
2323

2424
@Override
2525
public String toString() {
26-
return "Demo01Message{" +
26+
return "Demo07Message{" +
2727
"id=" + id +
2828
'}';
2929
}

lab-03/lab-03-kafka-demo-transaction/src/main/java/cn/iocoder/springboot/lab03/kafkademo/producer/Demo01Producer.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

lab-03/lab-03-kafka-demo-transaction/src/main/java/cn/iocoder/springboot/lab03/kafkademo/producer/Demo04Producer.java

Lines changed: 0 additions & 25 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package cn.iocoder.springboot.lab03.kafkademo.producer;
2+
3+
import cn.iocoder.springboot.lab03.kafkademo.message.Demo07Message;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.kafka.core.KafkaOperations;
7+
import org.springframework.kafka.core.KafkaTemplate;
8+
import org.springframework.kafka.support.SendResult;
9+
import org.springframework.stereotype.Component;
10+
import org.springframework.util.concurrent.ListenableFuture;
11+
12+
import javax.annotation.Resource;
13+
import java.util.concurrent.ExecutionException;
14+
15+
@Component
16+
public class Demo07Producer {
17+
18+
private Logger logger = LoggerFactory.getLogger(getClass());
19+
20+
@Resource
21+
private KafkaTemplate<Object, Object> kafkaTemplate;
22+
23+
public SendResult syncSend(Integer id) throws ExecutionException, InterruptedException {
24+
// 创建 Demo07Message 消息
25+
Demo07Message message = new Demo07Message();
26+
message.setId(id);
27+
// 同步发送消息
28+
return kafkaTemplate.send(Demo07Message.TOPIC, message).get();
29+
}
30+
31+
public SendResult<Object, Object> syncSendInTransaction(Integer id, Runnable runner) throws ExecutionException, InterruptedException {
32+
return kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<Object, Object, SendResult<Object, Object>>() {
33+
34+
@Override
35+
public SendResult<Object, Object> doInOperations(KafkaOperations<Object, Object> kafkaOperations) {
36+
// 创建 Demo07Message 消息
37+
SendResult<Object, Object> sendResult;
38+
Demo07Message message = new Demo07Message();
39+
message.setId(id);
40+
try {
41+
// sendResult = kafkaOperations.send(Demo07Message.TOPIC, message).get();
42+
sendResult = kafkaOperations.send(Demo07Message.TOPIC, message).get();
43+
logger.info("[doInOperations][发送编号:[{}] 发送结果:[{}]]", id, sendResult);
44+
} catch (Exception e) {
45+
throw new RuntimeException(e);
46+
}
47+
48+
// 本地业务逻辑... biubiubiu
49+
runner.run();
50+
51+
// 返回发送结果
52+
return sendResult;
53+
}
54+
55+
});
56+
}
57+
58+
}

lab-03/lab-03-kafka-demo-transaction/src/main/resources/application.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ spring:
44
bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
55
# Kafka Producer 配置项
66
producer:
7-
acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
7+
acks: all # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
88
retries: 3 # 发送失败时,重试发送的次数
99
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化
1010
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
11+
transaction-id-prefix: demo. # 事务编号前缀
1112
# Kafka Consumer 配置项
1213
consumer:
1314
auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
@@ -18,6 +19,8 @@ spring:
1819
json:
1920
trusted:
2021
packages: cn.iocoder.springboot.lab03.kafkademo.message
22+
isolation:
23+
level: read_committed # 读取已提交的消息
2124
# Kafka Consumer Listener 监听器配置
2225
listener:
2326
missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错

0 commit comments

Comments
 (0)