diff --git a/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/java/cn/iocoder/springcloud/labx10/rabbitmqdemo/producerdemo/controller/Demo01Controller.java b/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/java/cn/iocoder/springcloud/labx10/rabbitmqdemo/producerdemo/controller/Demo01Controller.java index 032b356bf..cfee040e1 100644 --- a/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/java/cn/iocoder/springcloud/labx10/rabbitmqdemo/producerdemo/controller/Demo01Controller.java +++ b/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/java/cn/iocoder/springcloud/labx10/rabbitmqdemo/producerdemo/controller/Demo01Controller.java @@ -5,7 +5,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; @@ -35,33 +34,4 @@ public boolean send() { return mySource.demo01Output().send(springMessage); } -// @StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel -// public void globalHandleError(ErrorMessage errorMessage) { -// logger.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage()); -// logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage()); -// logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders()); -// } - -// @StreamListener("ooxx") // errorChannel -// public void ooxx(ErrorMessage errorMessage) { -// logger.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage()); -// logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage()); -// logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders()); -// } - -//// @ServiceActivator(inputChannel = "demo-producer-application.ooxx") -// @ServiceActivator(inputChannel = "demo-producer-application.ooxx") -//// @StreamListener("demo-producer-application.ooxx") // errorChannel -// public void handleError(Message errorMessage) { -//// logger.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage()); -//// logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage()); -//// logger.error("[handleError][headers:{}]", errorMessage.getHeaders()); -// System.out.println(); -// } - - @ServiceActivator(inputChannel = "publisher-confirm") - public void onPublisherConfirm(Message message) { - logger.debug("on publisher confirm"); - } - } diff --git a/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/java/cn/iocoder/springcloud/labx10/rabbitmqdemo/producerdemo/message/Demo01ProducerConfirmCallback.java b/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/java/cn/iocoder/springcloud/labx10/rabbitmqdemo/producerdemo/message/Demo01ProducerConfirmCallback.java new file mode 100644 index 000000000..f2738f48b --- /dev/null +++ b/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/java/cn/iocoder/springcloud/labx10/rabbitmqdemo/producerdemo/message/Demo01ProducerConfirmCallback.java @@ -0,0 +1,20 @@ +package cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo.message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +@Component +public class Demo01ProducerConfirmCallback { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @ServiceActivator(inputChannel = "demo01-producer-confirm") + public void onPublisherConfirm(Message message) { + logger.info("[onPublisherConfirm][headers:{}]", message.getHeaders()); + logger.info("[onPublisherConfirm][payload:{}]", message.getPayload()); + } + +} diff --git a/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/java/cn/iocoder/springcloud/labx10/rabbitmqdemo/producerdemo/message/Demo01ProducerReturnCallback.java b/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/java/cn/iocoder/springcloud/labx10/rabbitmqdemo/producerdemo/message/Demo01ProducerReturnCallback.java new file mode 100644 index 000000000..c1ca385eb --- /dev/null +++ b/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/java/cn/iocoder/springcloud/labx10/rabbitmqdemo/producerdemo/message/Demo01ProducerReturnCallback.java @@ -0,0 +1,30 @@ +package cn.iocoder.springcloud.labx10.rabbitmqdemo.producerdemo.message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.stereotype.Component; + +@Component +public class Demo01ProducerReturnCallback { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @ServiceActivator(inputChannel = "DEMO-TOPIC-01.errors") + public void handleError(ErrorMessage errorMessage) { + logger.error("[handleError][headers:{}]", errorMessage.getHeaders()); + logger.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage()); + logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage()); + } + + @StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel + public void globalHandleError(ErrorMessage errorMessage) { + logger.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage()); + logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage()); + logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders()); + } + +} diff --git a/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/resources/application.yml b/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/resources/application.yml index 20d859ac9..470e5d3f5 100644 --- a/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/resources/application.yml +++ b/labx-10/labx-10-sc-stream-rabbitmq-producer-confirm/src/main/resources/application.yml @@ -16,9 +16,8 @@ spring: port: 5672 # RabbitMQ 服务的端口 username: guest # RabbitMQ 服务的账号 password: guest # RabbitMQ 服务的密码 - publisherConfirms: true - publisherReturns: true - publisher-confirm-type: simple # 设置 Confirm 类型为 SIMPLE 。 + publisher-returns: true # 设置消息是否回退,默认为 false + publisher-confirm-type: simple # 设置开启消息确认模型,默认为 null 不进行确认 # Binding 配置项,对应 BindingProperties Map bindings: demo01-output: @@ -26,14 +25,14 @@ spring: content-type: application/json # 内容格式。这里使用 JSON binder: rabbit001 # 设置使用的 Binder 名字 producer: - errorChannelEnabled: true + error-channel-enabled: true # 是否开启异常 Channel,默认为 false 关闭 # RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map rabbit: bindings: demo01-output: # RabbitMQ Producer 配置项,对应 RabbitProducerProperties 类 producer: - confirmAckChannel: publisher-confirm + confirm-ack-channel: demo01-producer-confirm # 设置发送确认的 Channel,默认为 null server: port: 18080