一文搞懂RabbitMQ的ack与nack

2021-04-15 15:32:54 浏览数 (2)

使用 MQ 时,需要注意保证消息不会丢失且被准确消费。

代码语言:javascript复制
connection = factory.newConnection();
      
final Channel channel = connection.createChannel();
channel.queueDeclare("队列名", true, false, false, null);


//第二个参数设为true为自动应答,false为手动ack
channel.basicConsume("队列名", true, new DefaultConsumer(channel){
    @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
       try {
          Thread.sleep(10000);
          System.out.println(new String(body, "UTF-8"));
        //模拟异常
        int i = 1/0;
        //手动ack
        //channel.basicAck(envelope.getDeliveryTag(), false); 
      } catch (Exception e) {
        
        //重新放入队列
        //channel.basicNack(envelope.getDeliveryTag(), false, true);
        //抛弃此条消息
        //channel.basicNack(envelope.getDeliveryTag(), false, false);
        e.printStackTrace();


      }finally {
        
      }
            
        }
});

handleDelivery是回调方法,如果队列中有消息就会执行这个方法,参数中的body就是消息内容。 channel.basicConsume 方法中第二个参数为boolean 类型,意思是消息的ack 需要自动(true)还是手动(false)。

ack机制为自动

不管 try 中有没有异常,消息管理界面上队列里的消息都被消费了,没有了(ready和unacked状态栏都没有了),下面是管理界面, 队列中的未被消费的消息有多少条都会在ready状态栏下,分发到消费端后,消费端没有回发ack的消息会在unacked状态栏中。

手动ack应答(channel.basicAck方法)

这样做的目的是保证消息在正确消费后给回馈,说明我正确消费了。这时队列就可以把这条消息删除了,如果消费端接收了消息,但是没有给返回ack应答,那么这条消息会继续存在unacked状态下,占据队列的空间,等到空间满了,就会出现接下来的消息不能被消费的情况。

正确的消息被ack了,那么在消费过程中发生异常怎么办?该条消费肯定不能返回ack应答了,这时就需要channel.basicNack,该方法解决了消费异常情况下该条消息怎么处理,有两种办法:

  1. 这条消息重新放回队列,重新消费
  2. 抛弃此条消息

具体使用哪个方法,这种情况下,建议捕捉异常类型,判断是哪种异常,再做具体处理。

0 人点赞