Skip to content

Commit

Permalink
增加 spring cloud stream rabbitmq 示例(消息确认,完成乱炖)
Browse files Browse the repository at this point in the history
  • Loading branch information
YunaiV committed Mar 6, 2020
1 parent 8e80452 commit fd39615
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
Expand Down Expand Up @@ -35,33 +34,4 @@ public boolean send() {
return mySource.demo01Output().send(springMessage);
}

// @StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
// public void globalHandleError(ErrorMessage errorMessage) {
// logger.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage());
// logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
// logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
// }

// @StreamListener("ooxx") // errorChannel
// public void ooxx(ErrorMessage errorMessage) {
// logger.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage());
// logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
// logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
// }

//// @ServiceActivator(inputChannel = "demo-producer-application.ooxx")
// @ServiceActivator(inputChannel = "demo-producer-application.ooxx")
//// @StreamListener("demo-producer-application.ooxx") // errorChannel
// public void handleError(Message<?> errorMessage) {
//// logger.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage());
//// logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
//// logger.error("[handleError][headers:{}]", errorMessage.getHeaders());
// System.out.println();
// }

@ServiceActivator(inputChannel = "publisher-confirm")
public void onPublisherConfirm(Message message) {
logger.debug("on publisher confirm");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo.message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class Demo01ProducerConfirmCallback {

private Logger logger = LoggerFactory.getLogger(getClass());

@ServiceActivator(inputChannel = "demo01-producer-confirm")
public void onPublisherConfirm(Message message) {
logger.info("[onPublisherConfirm][headers:{}]", message.getHeaders());
logger.info("[onPublisherConfirm][payload:{}]", message.getPayload());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo.message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Component;

@Component
public class Demo01ProducerReturnCallback {

private Logger logger = LoggerFactory.getLogger(getClass());

@ServiceActivator(inputChannel = "DEMO-TOPIC-01.errors")
public void handleError(ErrorMessage errorMessage) {
logger.error("[handleError][headers:{}]", errorMessage.getHeaders());
logger.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage());
logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
}

@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
public void globalHandleError(ErrorMessage errorMessage) {
logger.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage());
logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,23 @@ spring:
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
publisherConfirms: true
publisherReturns: true
publisher-confirm-type: simple # 设置 Confirm 类型为 SIMPLE 。
publisher-returns: true # 设置消息是否回退,默认为 false
publisher-confirm-type: simple # 设置开启消息确认模型,默认为 null 不进行确认
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
binder: rabbit001 # 设置使用的 Binder 名字
producer:
errorChannelEnabled: true
error-channel-enabled: true # 是否开启异常 Channel,默认为 false 关闭
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
rabbit:
bindings:
demo01-output:
# RabbitMQ Producer 配置项,对应 RabbitProducerProperties 类
producer:
confirmAckChannel: publisher-confirm
confirm-ack-channel: demo01-producer-confirm # 设置发送确认的 Channel,默认为 null

server:
port: 18080

0 comments on commit fd39615

Please sign in to comment.