- Tại sao CLUSTER? Để tăng tính sẵn sàng (availability) của hệ thống
- Tại sao Spring Boot? Đây là framework JAVA hỗ trợ lập trình viên phát triển nhanh ứng dụng ở mức chuyên nghiệp
- Yêu cầu biết các đối tượng, khái niệm cơ bản trong mô hình chương trình sử dụng amqp như publisher, exchange, queue, consumer, messsage, broker, channel
-
Message durability:
- Khi RabbitMQ crash hoặc quit các message của queue sẽ bị mất. Để message không bị mất ta cần đảm bảo 2 thứ: queue và message được đánh dấu durable
- Mặc định Spring đánh dấu queue là durable khi khai báo
public class RabbitmqConfiguration { @Bean public Queue defaultQueue() { Map<String, Object> arguments = new HashMap<>(); return new Queue(defaultQueue); } }
- Mặc định Spring đánh dấu message là durable
public class RabbitmqPublisher { private void pushMultipleMessageToQueue(String queue) { for (int i = 1; i < 1000; i ++) { MyMessage message = new MyMessage(i, "Title " + i, "Message " + i); rabbitTemplate.convertAndSend(queue, message); } } }
- Để gửi non-persistent message với SPRING ta làm như sau
public class RabbitmqPublisher { private void pushNonPersistent(String queue) { for (int i = 1; i < 1000; i ++) { MyMessage myMessage = new MyMessage(i, "Title " + i, "Message " + i); rabbitTemplate.convertAndSend(myMessage, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; } }); } } }
- Dù queue và message được đánh dấu durable vẫn không đảm bảo rằng messsage không bị mất. Vì mặc dù chương trình đã bảo RabbitMQ lưu messsage xuống ổ cứng nhưng vẫn có một khoảng thời gian ngắn message được lưu ở cache (RabbitMQ không thực hiện lưu xuống ổ cứng - fsync cho mọi message)
-
Delivery tag:
- Khi consumer đăng ký nhận message với RabbitMQ, message được RabbitMQ chuyển đi bằng phương thức basic.deliver. Phương thức này mang theo delivery tag thông số độc nhất (unique) trên một channel
-
Acknowledgement (ACK):
-
RabbitMQ sử dụng protocol AMQP để giao tiếp với publisher và consumer. Protocol này (cũng như các protocol tương tự STOMP, MQTT, ...) không đảm bảo rằng message được chuyển đến đích và được xử lý thành công. Vì vậy cần có một cơ chế để xác nhận việc chuyển và xử lý thành công này.
-
Việc xác nhận chuyển và xử lý từ consumer tới RabbitMQ được gọi là ACK
-
Việc xác nhận từ RabbitMQ tới publisher là một phần mở rộng của giao thức AMQP và được gọi là publisher confirm
-
Các chế độ ACK:
- basic.ack xác nhận rằng message được được chuyển và xử lý thành công
- basic.reject xác nhận rằng message được chuyển thành công, xử lý không thành công và có thể yêu cầu RabbitMQ xoá hay không xoá message
- basic.nack tương tự basic.reject nhưng có thể gửi reject cho nhiều message
-
Khi nào RabbitMQ gửi confirm tới publisher?
- với message không định tuyến được: Khi exchange xác nhận rằng không thể định tuyến được messsage
- với message định tuyến được và không durable: hi được chấp nhật bởi tất cả các queue
- với message định tuyến được và durable: khi được lưu xuống ổ cứng
-
Với Spring mặc định nếu không có Exception được throw hoặc nếu có nhưng nằm ngoài 6 loại exception được quy định trước Spring sẽ gửi bản tin ACK cho RabbitMQ. Nếu có Exception và nằm ngoài 6 loại exception được quy định trước thì bản tin NACK sẽ được gửi (không thấy tài liệu nào nói về việc dùng auto ACK cho consumer với Spring như ví dụ với Java thuần của RabbitMQ).
-
Chủ động ACK từ consumer tới RabbitMQ với Spring
- Chủ động ACK, gửi bản tin REJECT khi xử lý không thành công và yêu cầu RabbitMQ requeue
public class RabbitmqConsumer { @RabbitListener(queues = "${rabbitmq.queue.advance}") public void handleAdvanceQueue(MyMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { boolean isProcessOk = processBussinessLogic(); if(isProcessOk){ //ok send ACK channel.basicAck(tag, false); }else { //nok send REJECT and requeue channel.basicReject(tag, true); } } }
- Thiết lập chủ động ACK trong file cấu hình
#application.properties spring.rabbitmq.listener.acknowledge-mode=manual
-
Yêu cầu ACK từ RabbitMQ bởi publisher với Spring
- Bổ sung CorrelationData cho model
public class MyMessage implements Serializable { public CorrelationData getCorrelationData() { return new CorrelationData(id + "|" + title); } }
- Thiết lập việc bắt sự kiện khi RabbitMQ gửi ACK
public class RabbitmqPublisherConfirm { private void setupCallbacks() { rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { //receive this callback when message is not routable log.info("***************************************************************************************"); log.error(String.format("Received returned message with result %s|%s|%s|%s|%s", message.getBody(), replyCode, replyText, exchange, routingKey)); log.debug(String.format("Message detail ", message)); }); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { //receive this callback with ack/nack from RabbitMQ log.info("***************************************************************************************"); log.info(String.format("Message received by broker %s|%s|%s", correlationData, ack, cause)); //retry if (!ack) { if (correlationData instanceof CorrelationDataWithMessage) { CorrelationDataWithMessage completeCorrelationData = (CorrelationDataWithMessage) correlationData; rabbitTemplate.send("", defaultQueue, completeCorrelationData.getMessage()); } } }); /* * Replace the correlation data with one containing the converted message in case * we want to resend it after a nack. */ rabbitTemplate .setCorrelationDataPostProcessor((message, correlationData) -> new CorrelationDataWithMessage( correlationData != null ? correlationData.getId() : null, message)); } }
- Mở rộng class CorrelationData (dữ liệu RabbitMQ sẽ trả về cho publisher cùng thông tin ACK) chứa message dùng để gửi lại cho RabbitMQ khi nhận được NACK
public class CorrelationDataWithMessage extends CorrelationData { private final Message message; public CorrelationDataWithMessage(String id, Message message){ super(id); this.message = message; } public Message getMessage() { return message; } @Override public String toString() { return "CorrelationDataWithMessage [id=" + getId() + ", message=" + message + "]"; } }
- Thực hiện gửi bản tin tới RabbitMQ
public class RabbitmqPublisherConfirm { public void publishConfirm() throws Exception { setupCallbacks(); MyMessage myMessage = new MyMessage(99, "test message", "test body"); // send a message to the default exchange to be routed to the queue rabbitTemplate.convertAndSend("", defaultQueue, myMessage, myMessage.getCorrelationData()); // send a message to the default exchange to be routed to a non-existent queue rabbitTemplate.convertAndSend("", defaultQueue + defaultQueue, "bar"); } }
- Thiết lập confirmCallback và returnCallback trong file cấu hình
#application.properties spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true
-
-
_Đa luồng với consumer
-
Mặc định Spring chỉ chạy 1 consumer khi dùng RabbitListener
-
Ta có thể thay đổi số luồng (thread) consumer bằng 2 cách sau:
- Cấu hình trong file cấu hình (Spring Boot version >= v1.5.6.RELEASE)
#appllication.properties spring.rabbitmq.listener.simple.concurrency= # Số consumer tối thiểu khởi tạo khi chạy chương trình. spring.rabbitmq.listener.simple.max-concurrency= # Số consumer tối đa tạo ra khi nhận message từ RabbitMQ.
- Tạo bean SimpleRabbitListenerContainerFactory và thiết lập các cấu hình trong Configuration class
public class RabbitmqConfiguration { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConcurrentConsumers(concurrentConsumers); factory.setMaxConcurrentConsumers(maxConcurrentConsumers); factory.setConnectionFactory(connectionFactory); return factory; } }
-
Khi chạy đa luồng consumer như thế này thì mỗi consumer sẽ sử dụng 1 channel riêng (delivery tag của channel khác nhau có thể giống nhau) nhưng dùng chung 1 connection
-
-
Cài đặt erlang, RabbitMQ theo hướng dẫn trên trang chủ của RabbitMQ
-
Chạy các node sử dụng file cấu hình data/rabbitmq.config:
- chú ý đảm bảo mỗi node có các tham số cấu hình sau đây khác nhau:
- node names: RABBITMQ_NODENAME
- data store locations: RABBITMQ_MNESIA_DIR
- log file locations: RABBITMQ_LOG_BASE, RABBITMQ_CONFIG_FILE
- bind to different ports, including those used by plugins: RABBITMQ_NODE_PORT, RABBITMQ_DIST_PORT
- chạy node thứ nhất: RABBITMQ_NODE_PORT=5671 RABBITMQ_DIST_PORT=25671 RABBITMQ_NODENAME=rabbit RABBITMQ_CONFIG_FILE=/u01/applications/rabbitmq/instance1/rabbitmq RABBITMQ_MNESIA_DIR=/u01/applications/rabbitmq/instance1/data RABBITMQ_LOG_BASE=/u01/applications/rabbitmq/instance1/log rabbitmq-server -detached
- chạy node thứ hai: RABBITMQ_NODE_PORT=5672 RABBITMQ_DIST_PORT=25672 RABBITMQ_NODENAME=rabbit2 RABBITMQ_CONFIG_FILE=/u01/applications/rabbitmq/instance2/rabbitmq RABBITMQ_MNESIA_DIR=/u01/applications/rabbitmq/instance2/data RABBITMQ_LOG_BASE=/u01/applications/rabbitmq/instance2/log rabbitmq-server -detached
- chú ý đảm bảo mỗi node có các tham số cấu hình sau đây khác nhau:
-
Join node thứ hai với node thứ nhất rabbitmqctl -n rabbit2 stop_app rabbitmqctl -n rabbit2 join_cluster rabbit@
hostname -s
rabbitmqctl -n rabbit2 start_app -
Thiết lập mirror policy cho các queue có tên bắt đầu với "queue."
rabbitmqctl -n rabbit set_policy ha-two "^queue\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
-
Từ v1.9.0 NGINX hỗ trợ LB TCP với thẻ stream
-
Kh cài đặt NGINX từ source thêm cấu hình --with-stream
-
Cấu hình NGINX
stream { upstream rabbitmq_backend { zone rabbitmq_backend 64k; server localhost:5671 weight=5 max_fails=2 fail_timeout=30s; server localhost:5672 backup; server localhost:5673 weight=5 max_fails=2 fail_timeout=30s; } server { listen 5677; proxy_pass rabbitmq_backend; } }