RabbitMQ 消息确认详解

2021-12-17 22:30:11 浏览数 (1)

引言

RabbitMQ的模型是生产者发送信息到 Broker (代理),消费者从 Broker 中取出信息。但是生产者怎么知道消息是否真的发送到 Broker

中了呢?Broker 又怎么知道消息到底有没有被消费者消费?

如果由于网络原因出现故障,生产者生产的消息未到达 Broker 或者 Broker 的消息被虚假消费,而它们又不知道,就会产生很严重的问题,如重复消费等。

01 RabbitMQ的消息确认流程

从图中可以看出:

消息确认机制分为生产者确认和消费者确认

  • ConfirmCallback 生产者
  • ReturnCallback 生产者
  • ACK 消费者

02 生产者确认

  • 消息到达RabbitMQ的Exchange:Exchange向生产者发送Confirm确认。成功抑或失败都会返回一个confirmCallback
  • 消息成功达到Exchange,但是从Exchange投递Queue失败:向生产者返回一个returnCallback。只有失败才会返回

03 消费者确认

  • 消费者收到消息后需要对 RabbitMQ Server 进行消息 ACK 确认,RabbitMQ 根据确认信息决定是删除队列中的该信息还是重新发送

04 代码实现

4.1 生产者确认

重点在于生产者重写下面两个方法

  • rabbitMQTemplate.setConfirmCallback
  • rabbitMQTemplate.setReturnCallback

1.开启生产者消息确认

代码语言:txt复制
spring:
代码语言:txt复制
  rabbitmq:
代码语言:txt复制
    host: localhost
代码语言:txt复制
    port: 5672
代码语言:txt复制
    virtual-host: /
代码语言:txt复制
    username: root
代码语言:txt复制
    password: root
代码语言:txt复制
    #    开启两个模式的生产者消息确认
代码语言:txt复制
    publisher-confirm-type: simple
代码语言:txt复制
    publisher-returns: true

2.声明交换机、队列,绑定交换机和队列

代码语言:txt复制
@Configuration
代码语言:txt复制
public class RabbitMQConfig {
代码语言:txt复制
    private static final String SB_TOPIC_EXCHANGE="sb_topic_exchange";
代码语言:txt复制
    private static final String SB_TOPIC_QUEUE="sb_topic_queue1";
代码语言:txt复制
    // 注入交换机 topic类型
代码语言:txt复制
    @Bean("topicExchange")
代码语言:txt复制
    public Exchange topicExchange(){
代码语言:txt复制
        return ExchangeBuilder.topicExchange(SB_TOPIC_EXCHANGE).durable(true)
代码语言:txt复制
                .autoDelete().build();
代码语言:txt复制
    }
代码语言:txt复制
    // 声明队列
代码语言:txt复制
    @Bean
代码语言:txt复制
    public Queue queue1(){
代码语言:txt复制
        return QueueBuilder.durable(SB_TOPIC_QUEUE).build();
代码语言:txt复制
    }
代码语言:txt复制
    // 绑定队列和交换机
代码语言:txt复制
    @Bean
代码语言:txt复制
    public Binding exchangQueue(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") Exchange exchange){
代码语言:txt复制
        return BindingBuilder.bind(queue).to(exchange).with("user.#").noargs();
代码语言:txt复制
    }

3.创建消费者

代码语言:txt复制
@Component
代码语言:txt复制
@RabbitListener(queues = "sb_topic_queue1")
代码语言:txt复制
public class Consumer {
代码语言:txt复制
    @RabbitHandler
代码语言:txt复制
    public void testPublishConfirm(String msg) {
代码语言:txt复制
        System.out.println("收到的信息:" msg);
代码语言:txt复制
    }
代码语言:txt复制
}

4.创建生产者

创建生产者发送消息到消息队列,模拟两种异常情况

代码语言:txt复制
@SpringBootTest
代码语言:txt复制
class RabiitmqSpringbootApplicationTests {
代码语言:txt复制
    @Autowired
代码语言:txt复制
    RabbitTemplate template;
代码语言:txt复制
    @Test
代码语言:txt复制
    void testConfirmTrue() {
代码语言:txt复制
        // 设置confirm回调函数
代码语言:txt复制
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
代码语言:txt复制
            @Override
代码语言:txt复制
            public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
代码语言:txt复制
                if (b) System.out.println("消息发送成功");
代码语言:txt复制
                else System.out.println("消息发送失败");
代码语言:txt复制
            }
代码语言:txt复制
        });
代码语言:txt复制
        // 模拟生产者发送信息--正常情况
代码语言:txt复制
        template.convertAndSend("sb_topic_exchange","user.info","日志级别:info;日志模块:user;日志信息:*****");
代码语言:txt复制
    }
代码语言:txt复制
    @Test
代码语言:txt复制
    void testConfirmFalse() {
代码语言:txt复制
        // 设置confirm回调函数
代码语言:txt复制
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
代码语言:txt复制
            @Override
代码语言:txt复制
            public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
代码语言:txt复制
                if (b) System.out.println("消息发送成功");
代码语言:txt复制
                else System.out.println("消息发送失败");
代码语言:txt复制
            }
代码语言:txt复制
        });
代码语言:txt复制
        // 模拟生产者发送信息
代码语言:txt复制
        // 不存在的交换机--异常情况
代码语言:txt复制
     template.convertAndSend("sb_topic_exchange_noexist","user.info","日志级别:info;日志模块:user;日志信息:*****");
代码语言:txt复制
    }
代码语言:txt复制
    @Test
代码语言:txt复制
    void testReturnFalse() {
代码语言:txt复制
        // 设置return回调函数
代码语言:txt复制
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
代码语言:txt复制
            @Override
代码语言:txt复制
            public void returnedMessage(Message message, int i, java.lang.String s, java.lang.String s1, java.lang.String s2) {
代码语言:txt复制
                System.out.println(message.toString());
代码语言:txt复制
                System.out.println(s "*********");
代码语言:txt复制
            }
代码语言:txt复制
        });
代码语言:txt复制
        template.setMandatory(true);
代码语言:txt复制
        // 模拟生产者发送信息
代码语言:txt复制
        // 正确的交换机 错误的routekey -- 异常情况
代码语言:txt复制
     template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");
代码语言:txt复制
    }

4.2 消费者确认

重点在于消费者的下面两个方法

  • channel.basicAck 消费者签收
  • channel.basicNAck 消费者拒绝签收

1.开启消费者确认模式

代码语言:txt复制
spring:
代码语言:txt复制
  rabbitmq:
代码语言:txt复制
    host: localhost
代码语言:txt复制
    port: 5672
代码语言:txt复制
    virtual-host: /
代码语言:txt复制
    username: root
代码语言:txt复制
    password: root
代码语言:txt复制
#    设置消费端手动签收
代码语言:txt复制
    listener:
代码语言:txt复制
      direct:
代码语言:txt复制
        acknowledge-mode: manual
代码语言:txt复制
      simple:
代码语言:txt复制
        acknowledge-mode: manual

2.创建消费者

代码语言:txt复制
/**
代码语言:txt复制
 * 注入消费者--手动签到
 */
@Component
@RabbitListener(queues = "sb_topic_queue1")
public class Consumer2 {
代码语言:txt复制
    @RabbitHandler
代码语言:txt复制
    public void testComsumer(String msg, Channel channel, Message message) throws InterruptedException, IOException {
代码语言:txt复制
        // 消费端设置手动签收代码
代码语言:txt复制
        try {
代码语言:txt复制
            System.out.println(msg);
代码语言:txt复制
            // 正常签收,mq收到此消息被正常签收后即可从队列中删除vi信息
代码语言:txt复制
            // 是哟了那个channel的方法
代码语言:txt复制
            // 第一个参数是deliverytag 标识哪条信息 第二个参数是是否批量签收
代码语言:txt复制
            // int i=2/0; 模拟异常
代码语言:txt复制
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
代码语言:txt复制
            System.out.println("消费者签收了该信息,服务器你可以删了");
代码语言:txt复制
        }catch (Exception e){
代码语言:txt复制
            // 异常拒绝签收,让mq重发此信息
代码语言:txt复制
            System.out.println("该信息丢了,给我重发");
代码语言:txt复制
       channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
代码语言:txt复制
           // 该信息丢了,但是不需要你重发
代码语言:txt复制
     // channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
代码语言:txt复制
        }
代码语言:txt复制
    }
代码语言:txt复制
}

3.创建生产者

代码语言:txt复制
@SpringBootTest
代码语言:txt复制
class RabiitmqSpringbootApplicationTests {
代码语言:txt复制
    @Autowired
代码语言:txt复制
    RabbitTemplate template;
代码语言:txt复制
    @Test
代码语言:txt复制
    void testConsumerAck() {
代码语言:txt复制
     template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");
代码语言:txt复制
    }
代码语言:txt复制
}

0 人点赞