Skip to content

icovn/spring-boot-rabbitmq-cluster

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RabbitMQ Cluster & Spring Boot

Tại sao Cluster? Tại sao Spring Boot?

  • Tại sao CLUSTER? Để tăng tính sẵn sàng (availability) của hệ thống
  • Tại sao Spring Boot? Đây là framework JAVA hỗ trợ lập trình viên phát triển nhanh ứng dụng ở mức chuyên nghiệp
  • Yêu cầu biết các đối tượng, khái niệm cơ bản trong mô hình chương trình sử dụng amqp như publisher, exchange, queue, consumer, messsage, broker, channel

Các khái niệm & cài đặt với Spring Booot

  • Message durability:

    • Khi RabbitMQ crash hoặc quit các message của queue sẽ bị mất. Để message không bị mất ta cần đảm bảo 2 thứ: queuemessage được đánh dấu durable
    • Mặc định Spring đánh dấu queuedurable khi khai báo
    public class RabbitmqConfiguration {
        @Bean
        public Queue defaultQueue() {
            Map<String, Object> arguments = new HashMap<>();
            return new Queue(defaultQueue);
        }
    }
    • Mặc định Spring đánh dấu messagedurable
    public class RabbitmqPublisher {
        private void pushMultipleMessageToQueue(String queue) {
            for (int i = 1; i < 1000; i ++) {
                MyMessage message = new MyMessage(i, "Title " + i, "Message " + i);
                rabbitTemplate.convertAndSend(queue, message);
            }
        }
    }
    • Để gửi non-persistent message với SPRING ta làm như sau
    public class RabbitmqPublisher {
        private void pushNonPersistent(String queue) {
            for (int i = 1; i < 1000; i ++) {
                MyMessage myMessage = new MyMessage(i, "Title " + i, "Message " + i);
                rabbitTemplate.convertAndSend(myMessage, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                        return message;
                    }
                });
            }
        }
    }
    • queuemessage được đánh dấu durable vẫn không đảm bảo rằng messsage không bị mất. Vì mặc dù chương trình đã bảo RabbitMQ lưu messsage xuống ổ cứng nhưng vẫn có một khoảng thời gian ngắn message được lưu ở cache (RabbitMQ không thực hiện lưu xuống ổ cứng - fsync cho mọi message)
  • Delivery tag:

    • Khi consumer đăng ký nhận message với RabbitMQ, message được RabbitMQ chuyển đi bằng phương thức basic.deliver. Phương thức này mang theo delivery tag thông số độc nhất (unique) trên một channel
  • Acknowledgement (ACK):

    • RabbitMQ sử dụng protocol AMQP để giao tiếp với publisherconsumer. Protocol này (cũng như các protocol tương tự STOMP, MQTT, ...) không đảm bảo rằng message được chuyển đến đích và được xử lý thành công. Vì vậy cần có một cơ chế để xác nhận việc chuyển và xử lý thành công này.

    • Việc xác nhận chuyển và xử lý từ consumer tới RabbitMQ được gọi là ACK

    • Việc xác nhận từ RabbitMQ tới publisher là một phần mở rộng của giao thức AMQP và được gọi là publisher confirm

    • Các chế độ ACK:

      • basic.ack xác nhận rằng message được được chuyển và xử lý thành công
      • basic.reject xác nhận rằng message được chuyển thành công, xử lý không thành công và có thể yêu cầu RabbitMQ xoá hay không xoá message
      • basic.nack tương tự basic.reject nhưng có thể gửi reject cho nhiều message
    • Khi nào RabbitMQ gửi confirm tới publisher?

      • với message không định tuyến được: Khi exchange xác nhận rằng không thể định tuyến được messsage
      • với message định tuyến được và không durable: hi được chấp nhật bởi tất cả các queue
      • với message định tuyến được và durable: khi được lưu xuống ổ cứng
    • Với Spring mặc định nếu không có Exception được throw hoặc nếu có nhưng nằm ngoài 6 loại exception được quy định trước Spring sẽ gửi bản tin ACK cho RabbitMQ. Nếu có Exception và nằm ngoài 6 loại exception được quy định trước thì bản tin NACK sẽ được gửi (không thấy tài liệu nào nói về việc dùng auto ACK cho consumer với Spring như ví dụ với Java thuần của RabbitMQ).

    • Chủ động ACK từ consumer tới RabbitMQ với Spring

      • Chủ động ACK, gửi bản tin REJECT khi xử lý không thành công và yêu cầu RabbitMQ requeue
      public class RabbitmqConsumer {
          @RabbitListener(queues = "${rabbitmq.queue.advance}")
          public void handleAdvanceQueue(MyMessage message, Channel channel,
                  @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
              boolean isProcessOk = processBussinessLogic();
              if(isProcessOk){
                  //ok send ACK
                  channel.basicAck(tag, false);
              }else {
                  //nok send REJECT and requeue
                  channel.basicReject(tag, true);
              }
          }
      }
      • Thiết lập chủ động ACK trong file cấu hình
      #application.properties
      spring.rabbitmq.listener.acknowledge-mode=manual
      
    • Yêu cầu ACK từ RabbitMQ bởi publisher với Spring

      • Bổ sung CorrelationData cho model
      public class MyMessage implements Serializable {
          public CorrelationData getCorrelationData() {
              return new CorrelationData(id + "|" + title);
          }
      }
      • Thiết lập việc bắt sự kiện khi RabbitMQ gửi ACK
      public class RabbitmqPublisherConfirm {
          private void setupCallbacks() {
              rabbitTemplate.setMandatory(true);
              rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                  //receive this callback when message is not routable
                  log.info("***************************************************************************************");
                  log.error(String.format("Received returned message with result %s|%s|%s|%s|%s",
                          message.getBody(), replyCode, replyText, exchange, routingKey));
                  log.debug(String.format("Message detail ", message));
              });
              rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                  //receive this callback with ack/nack from RabbitMQ
                  log.info("***************************************************************************************");
                  log.info(String.format("Message received by broker %s|%s|%s", correlationData, ack, cause));
      
                  //retry
                  if (!ack) {
                      if (correlationData instanceof CorrelationDataWithMessage) {
                          CorrelationDataWithMessage completeCorrelationData = (CorrelationDataWithMessage) correlationData;
                          rabbitTemplate.send("", defaultQueue, completeCorrelationData.getMessage());
                      }
                  }
              });
      
              /*
               * Replace the correlation data with one containing the converted message in case
               * we want to resend it after a nack.
               */
              rabbitTemplate
                      .setCorrelationDataPostProcessor((message, correlationData) -> new CorrelationDataWithMessage(
                              correlationData != null ? correlationData.getId() : null, message));
          }
      }
      • Mở rộng class CorrelationData (dữ liệu RabbitMQ sẽ trả về cho publisher cùng thông tin ACK) chứa message dùng để gửi lại cho RabbitMQ khi nhận được NACK
      public class CorrelationDataWithMessage extends CorrelationData {
          private final Message message;
      
          public CorrelationDataWithMessage(String id, Message message){
              super(id);
              this.message = message;
          }
      
          public Message getMessage() {
              return message;
          }
      
          @Override
          public String toString() {
              return "CorrelationDataWithMessage [id=" + getId() + ", message=" + message + "]";
          }
      }
      • Thực hiện gửi bản tin tới RabbitMQ
      public class RabbitmqPublisherConfirm {
          public void publishConfirm() throws Exception {
              setupCallbacks();
              MyMessage myMessage = new MyMessage(99, "test message", "test body");
              // send a message to the default exchange to be routed to the queue
              rabbitTemplate.convertAndSend("", defaultQueue, myMessage, myMessage.getCorrelationData());
      
              // send a message to the default exchange to be routed to a non-existent queue
              rabbitTemplate.convertAndSend("", defaultQueue + defaultQueue, "bar");
          }
      }
      • Thiết lập confirmCallback và returnCallback trong file cấu hình
      #application.properties
      spring.rabbitmq.publisher-confirms=true
      spring.rabbitmq.publisher-returns=true
      
  • _Đa luồng với consumer

    • Mặc định Spring chỉ chạy 1 consumer khi dùng RabbitListener

    • Ta có thể thay đổi số luồng (thread) consumer bằng 2 cách sau:

      • Cấu hình trong file cấu hình (Spring Boot version >= v1.5.6.RELEASE)
      #appllication.properties
      spring.rabbitmq.listener.simple.concurrency= # Số consumer tối thiểu khởi tạo khi chạy chương trình.
      spring.rabbitmq.listener.simple.max-concurrency= # Số consumer tối đa tạo ra khi nhận message từ RabbitMQ.
      
      • Tạo bean SimpleRabbitListenerContainerFactory và thiết lập các cấu hình trong Configuration class
      public class RabbitmqConfiguration {
          @Bean
          public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
              SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
              factory.setConcurrentConsumers(concurrentConsumers);
              factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
              factory.setConnectionFactory(connectionFactory);
      
              return factory;
          }
      }
    • Khi chạy đa luồng consumer như thế này thì mỗi consumer sẽ sử dụng 1 channel riêng (delivery tag của channel khác nhau có thể giống nhau) nhưng dùng chung 1 connection

Cluster

Cài đặt cluster trên 1 server

  • Cài đặt erlang, RabbitMQ theo hướng dẫn trên trang chủ của RabbitMQ

  • Chạy các node sử dụng file cấu hình data/rabbitmq.config:

    • chú ý đảm bảo mỗi node có các tham số cấu hình sau đây khác nhau:
      • node names: RABBITMQ_NODENAME
      • data store locations: RABBITMQ_MNESIA_DIR
      • log file locations: RABBITMQ_LOG_BASE, RABBITMQ_CONFIG_FILE
      • bind to different ports, including those used by plugins: RABBITMQ_NODE_PORT, RABBITMQ_DIST_PORT
    • chạy node thứ nhất: RABBITMQ_NODE_PORT=5671 RABBITMQ_DIST_PORT=25671 RABBITMQ_NODENAME=rabbit RABBITMQ_CONFIG_FILE=/u01/applications/rabbitmq/instance1/rabbitmq RABBITMQ_MNESIA_DIR=/u01/applications/rabbitmq/instance1/data RABBITMQ_LOG_BASE=/u01/applications/rabbitmq/instance1/log rabbitmq-server -detached
    • chạy node thứ hai: RABBITMQ_NODE_PORT=5672 RABBITMQ_DIST_PORT=25672 RABBITMQ_NODENAME=rabbit2 RABBITMQ_CONFIG_FILE=/u01/applications/rabbitmq/instance2/rabbitmq RABBITMQ_MNESIA_DIR=/u01/applications/rabbitmq/instance2/data RABBITMQ_LOG_BASE=/u01/applications/rabbitmq/instance2/log rabbitmq-server -detached
  • Join node thứ hai với node thứ nhất rabbitmqctl -n rabbit2 stop_app rabbitmqctl -n rabbit2 join_cluster rabbit@hostname -s rabbitmqctl -n rabbit2 start_app

  • Thiết lập mirror policy cho các queue có tên bắt đầu với "queue."

    rabbitmqctl -n rabbit set_policy ha-two "^queue\."    '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
    

Dùng NGINX để LB RabbitMQ

  • Từ v1.9.0 NGINX hỗ trợ LB TCP với thẻ stream

  • Kh cài đặt NGINX từ source thêm cấu hình --with-stream

  • Cấu hình NGINX

    stream {
        upstream rabbitmq_backend {
            zone rabbitmq_backend 64k;
    
            server localhost:5671 weight=5 max_fails=2 fail_timeout=30s;
            server localhost:5672 backup;
            server localhost:5673 weight=5 max_fails=2 fail_timeout=30s;
        }
    
        server {
            listen        5677;
            proxy_pass    rabbitmq_backend;
        }
    } 
    

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages