Skip to content

Commit

Permalink
增加 spring cloud stream rabbitmq 示例
Browse files Browse the repository at this point in the history
  • Loading branch information
YunaiV committed Mar 7, 2020
1 parent 231102c commit 3a7551f
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message) {
public void onMessage(@Payload List<Demo01Message> message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ spring:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-04 # 目的地。这里使用 RabbitMQ Exchange
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
binder: rabbit001 # 设置使用的 Binder 名字
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
batch-mode: true # 是否批量消费默认,默认为 false

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class Demo01Controller {
private MySource mySource;

@GetMapping("/send_batch")
public boolean send() throws InterruptedException {
public boolean sendBatch() throws InterruptedException {
// 发送 3 条消息,每条中间间隔 10 秒
for (int i = 0; i < 3; i++) {
// 创建 Message
Expand All @@ -36,7 +36,7 @@ public boolean send() throws InterruptedException {
mySource.demo01Output().send(springMessage);

// 故意每条消息之间,隔离 10 秒
logger.info("[testSyncSend][发送编号:[{}] 发送成功]", message.getId());
logger.info("[sendBatch][发送编号:[{}] 发送成功]", message.getId());
Thread.sleep(10 * 1000L);
}
return true;
Expand Down

0 comments on commit 3a7551f

Please sign in to comment.