使用 MQ 时,需要注意保证消息不会丢失且被准确消费。
代码语言:javascript复制connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("队列名", true, false, false, null);
//第二个参数设为true为自动应答,false为手动ack
channel.basicConsume("队列名", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(10000);
System.out.println(new String(body, "UTF-8"));
//模拟异常
int i = 1/0;
//手动ack
//channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
//重新放入队列
//channel.basicNack(envelope.getDeliveryTag(), false, true);
//抛弃此条消息
//channel.basicNack(envelope.getDeliveryTag(), false, false);
e.printStackTrace();
}finally {
}
}
});
handleDelivery是回调方法,如果队列中有消息就会执行这个方法,参数中的body就是消息内容。 channel.basicConsume 方法中第二个参数为boolean 类型,意思是消息的ack 需要自动(true)还是手动(false)。
ack机制为自动
不管 try 中有没有异常,消息管理界面上队列里的消息都被消费了,没有了(ready和unacked状态栏都没有了),下面是管理界面, 队列中的未被消费的消息有多少条都会在ready状态栏下,分发到消费端后,消费端没有回发ack的消息会在unacked状态栏中。
手动ack应答(channel.basicAck方法)
这样做的目的是保证消息在正确消费后给回馈,说明我正确消费了。这时队列就可以把这条消息删除了,如果消费端接收了消息,但是没有给返回ack应答,那么这条消息会继续存在unacked状态下,占据队列的空间,等到空间满了,就会出现接下来的消息不能被消费的情况。
正确的消息被ack了,那么在消费过程中发生异常怎么办?该条消费肯定不能返回ack应答了,这时就需要channel.basicNack
,该方法解决了消费异常情况下该条消息怎么处理,有两种办法:
- 这条消息重新放回队列,重新消费
- 抛弃此条消息
具体使用哪个方法,这种情况下,建议捕捉异常类型,判断是哪种异常,再做具体处理。