文章目录
- RabbitMQ 学习(七)----发布确认
- (1)开启发布确认的方法
- (2)单个发布确认
- (3)批量确认发布
- (4)异步发布确认
RabbitMQ 学习(七)----发布确认
发布确认是一个保证RabbitMQ 可靠性的一个机制
保证生产者将信息成功的发送到 RabbitMQ的 server端了,那么broker就会回一个确认,如果没有收到或者收到拒绝信息,那么说明可能网络不好没有发送成功,server端宕机了,broker拒绝接收等情况,如果不进行后续处理,那么信息就会丢失,生产者收到失败的消息使用回调函数在进行处理。
生产者将信道设置成 confirm 模式,所有在该信道上发布的消息都会指定一个唯一的ID,一旦消息投递到队列中,就是发送成功了,broker会立刻发送一个确认ack 给生产者,这个时候,生产者就知道消息已经发送成功了。
如果队列和信息是持久化的,那么确认消息会在将消息写入磁盘之后再发出,broker返回的确认包含 确认消息的序列号,还可以设置 multiple,表示此序号前的所有消息都得到了处理。
一旦发布消息,生产者等待确认的同时继续发送下一条消息,如果rabbitMq自身内部错误导致消息为发送成功,生产者就可以再回调方法中继续处理。
为了保证RabbitMQ的可靠性,生产者怎么做?
1、设置要求队列持久化
2、设置队列中的消息持久化
3、发布确认,保证写入磁盘,broker成功收到
(1)开启发布确认的方法
channel 的cofirm模式默认是没有开启的,如果需要开启需要调用 confirmSelect(),当我们使用发布确认的时候。需要使用channel调用该方法。
代码语言:javascript复制Channel channel =connection.createChannel();
channel.confirmSelect();
(2)单个发布确认
这是一种同步确认发布的方式,就是发布一个消息之后等待确认后,后续的消息才能继续发布。waitForConfiemOrDie(long)
这个方法只有当消息被确认才会返回,如果在指定的时间内未返回就会抛出异常。
这种确认方式最大的缺点:速度特别慢。如果消息没有确认,就会阻塞后续消息的发送,造成发送消息的速度很慢。
代码语言:javascript复制public class SingleConfirm {
/**
* 发布确认模式
* 1、单个确认
* @param args
*/
public static void main(String[] args) {
Connection connection = RabbitMQUtils.getConnect();
Channel channel = null;
try {
channel = connection.createChannel();
// 开启确认模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare("confirm", true, false, false, null);
long begin = System.currentTimeMillis();
// 批量发送消息,每次发送进行确认
for (int i = 0; i <1000 ; i ) {
String message = i "";
// 发布单条消息
channel.basicPublish("", "confirm", null, message.getBytes());
// 单个消息发送之后,马上发布确认,使用 waitForConfirms
if(channel.waitForConfirms()){
//System.out.println("消息发送成功:" i);
}
}
long end = System.currentTimeMillis();
System.out.println("发送1000条数据,使用单个发布确认的时间为:" (end-begin));
} catch (Exception e){
e.printStackTrace();
}finally {
RabbitMQUtils.close(channel, connection);
}
}
}
非常浪费时间
(3)批量确认发布
每发送一部分消息,批量同步确认一次,若有消息无法发出,该模式无法确认是哪个消息无法发送;
发布1000条消息,每发送100条确认一次
代码语言:javascript复制public class MultipleConfirm {
public static void main(String[] args) {
Connection connection = RabbitMQUtils.getConnect();
Channel channel = null;
try {
channel = connection.createChannel();
// 开启确认模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare("confirm", true, false, false, null);
long begin = System.currentTimeMillis();
// 批量确认消息的数量,没发送100个返回一个确认ack
int notAck = 100;
// 批量发送消息,每次发送进行确认
for (int i = 0; i <1000 ; i ) {
String message = i "";
// 发布单条消息
channel.basicPublish("", "confirm", null, message.getBytes());
if((i 1)%notAck==0){
// 每发送100条确认一次,查看是否这一批是否有发送失败的情况
channel.waitForConfirmsOrDie();
}
}
long end = System.currentTimeMillis();
System.out.println("发送1000条数据,使用批量发布确认的时间为:" (end-begin));
} catch (Exception e){
e.printStackTrace();
}finally {
RabbitMQUtils.close(channel, connection);
}
}
}
时间为 396毫秒
(4)异步发布确认
生产者发送消息与 接收确认这两个步骤不是同步的,是异步的,生产者只管发送,同时使用监听(addConfirmListener)返回的确认,对成功确认、失败确认两种情况分别进行处理。非常高效且安全
- 开启确认模式
- 声明确认成功的callback
- 声明确认失败的callback
- 开启确认监听 addConfirmListener() ,设置callback
- 信道发送消息,不需要额外设置接收waitForConfirm什么的
public static void main(String[] args) {
Connection connection = RabbitMQUtils.getConnect();
Channel channel = null;
try {
channel = connection.createChannel();
// 开启确认模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare("confirm", true, false, false, null);
// 作为接收成功的函数式接口 参数
ConfirmCallback ackCallback =(deliveryTag, multiple)-> System.out.println("确认的消息: " deliveryTag);
// 表示接收成功的回调函数
// 作为接收失败的函数式接口 参数
ConfirmCallback nackCallback = (deliveryTag,multiple)-> System.out.println("接收失败!");
// 表示接收失败的回调函数
// 这是一个异步的监听 消息返回确认信息的 反应
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
// 批量发送消息,每次发送进行确认
for (int i = 0; i <1000 ; i ) {
String message = i "";
// 发布单条消息
channel.basicPublish("", "confirm", null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发送1000条数据,使用异步发布确认的时间为:" (end-begin) "ms");
} catch (Exception e){
e.printStackTrace();
}
// 执行完不能关闭连接,还要继续监听确认的信息
/**
finally {
RabbitMQUtils.close(channel, connection);
}
*/
}
时间非常高效