Confirm确认消息
理解Confirm消息确认机制
- 消息的确认, 是指投递消息后, 如果Broker收到消息, 则会给我们生产者一个应答
- 生产者进行接收应答用来确定这条消息是否正常的发送到Broker, 这种方式也是消息的可靠性投递的核心保障
Confirm确认消息流程解析
Confirm确认消息实现
- 在Channel上开启确认模式: channel.confirmSelect()
- 在Channel上添加监听: addConfirmListener, 监听成功和失败的返回结果, 根据具体的结果对消息进行
重新发送
,记录日志
或者等后续处理
代码实现
消费者
代码语言:javascript复制package com.dance.redis.mq.rabbit.confirm;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Receiver4ConfirmListener {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_confirmlistener_exchange";
String queueName = "test_confirmlistener_queue";
String routingKey = "confirm.#";
RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
RabbitMQHelper.queueDeclare(channel,queueName);
channel.queueBind(queueName, exchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("receive message:" new String(body) ", RoutingKey: " envelope.getRoutingKey());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, consumer);
//等待回调函数执行完毕之后,关闭资源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
生产者
代码语言:javascript复制package com.dance.redis.mq.rabbit.confirm;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Sender4ConfirmListener {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_confirmlistener_exchange";
String routingKey1 = "confirm.save";
String msg = "Hello World RabbitMQ 4 Confirm Listener Message ...";
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
// 处理失败
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("------- error ---------");
}
// 处理成功
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("------- ok ---------");
}
});
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
测试
启动消费者
启动生产者
收到ok
查看消费者
消费成功