RabbitMQ与SpringBoot2.0整合

2022-04-13 15:34:31 浏览数 (1)

application.properties:

代码语言:javascript复制
spring.rabbitmq.addresses=192.
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

RabbitMQ与SpringBoot整合配置详解:

1. 生产端核心配置


  • publisher-confirms,实现一个监听器用于监听Broker端为我们返回的确认请求: RabbitTemplate.ConfirmCallback
  • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功: RabbitTemplate.ReturnCallback
  • 注意一点,在发送消息时候对template进行设置mandatory=true保证监听有效
  • 生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等。

RabbitSender:

代码语言:javascript复制
package com.pyy.springboot.producer;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData:"   correlationData);
            System.err.println("ack:"   ack);
            if(!ack) {
                System.err.println("异常处理...");
            }else {
                // 更新数据库对应的消息状态:已发送
            }
        }
    };


    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.err.println("return exchange:"   exchange   " , routingKey:"   routingKey   ", replyCode:"   replyCode   ", replyText:"   replyText);
        }
    };

    public void send(Object message, Map<String, Object> headerProperties) throws Exception {
        MessageHeaders messageHeaders = new MessageHeaders(headerProperties);
        Message msg = MessageBuilder.createMessage(message, messageHeaders);

        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("userid"   System.currentTimeMillis());// id   时间戳 全局唯一 实际消息的id
        //rabbitTemplate.convertAndSend("pyy.exchange", "springboot.hello", msg, correlationData);

        rabbitTemplate.convertAndSend("pyy.exchange", "fasdfsf.hello", msg, correlationData);

    }
}

2. 消费端核心配置


代码语言:javascript复制
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
  • 首先配置ACK手工确认模式,用于ACK的手工处理,这样我可以保证消息的可靠性送达,或者在消费失败时候可以做到重回队列、根据业务记录日志等处理。
  • 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况

@RabbitListener注解使用

  • 消费端监听@RabbitMQListener注解,这个对于在实际工作中非常的好用
  • @RabbitListener只一个组合的注解,里面可以注解配置@QueueBinding@Queue@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等
代码语言:javascript复制
package com.pyy.mq.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 消息接收者
 * @RabbitListener bindings:绑定队列
 *   @QueueBinding  value:绑定队列的名称
 *                exchange:配置交换器
 * 
 *     @Queue value:配置队列名称
 *        autoDelete:是否是一个可删除的临时队列
 * 
 *     @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.info.routing.key}"
        )
)
public class InfoReceiver {

    /**
     * 接收消息方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Info receiver:"   msg);
    }
}

@RabbitListener注解如果没有存在exchange和queue会自动创建

案例详细代码:https://github.com/pyygithub/springboot-rabbitmq

0 人点赞