RabbitMQ的特性是保障数据的一致性,稳定性和可靠性。但是如何来保障这些了?这就有了很多的保障机制。在前面的文章体系中也是介绍到RabbitMQ中的生产者负责把消息发送到Exchange,并不需要关心Queue是什么,那么问题就出现了,如果生产者发送的MQ消息消费者没有收到了?这如何可以做到前面说的数据的一致性以及可靠性了。我们可以结合现实的例子来看这部分,比如我向别人借了100元,然后我要了对方的银行卡号,把钱还给了对方,但是我给对方没有说,那么其实对方是不知道的,所以在对方的心理我始终还是欠他100元的,其实这样的案例在我实际的生活就出现过,当然是很多年前的事了,总是这过程确认反馈的机制。技术也是需要符合人性的,那么RabbitMQ为了做到数据的一致性的保障,在生产者端就有Confirm的确认机制。
一、Confirm确认消息
RabbitMQ消息队列服务器生产者Confirm确认消息可以具体总结为如下:
- 生产者发送的消息发送到Exchange后,如果RabbitMQ的Broker收到消息,需要给生产者一个应答
- 生产者会负责接收应答的消息,核心的目的是来确认发送的消息是否发送到Broker,这也是RabbitMQ
消息可靠性投递的完美设计。
二、Confirm交换图
生产者在发送消息后,需要MQ代理来确认是否消息发送成功,那么它的交换图可以梳理为如下:
三、Confirm案例实战
要想实现生产者的Confirm确认消息的机制,那么就需要在生产者端的代码中开启确认消息机制,然后在Channel上来进行具体的监听,如果成功,就会返回监听成功的信息。当然这个过程很难同步的实现,它是基于异步的机制来进行实现的,其实这也是很好理解的。因为同步可能存在超时以及堵塞的情况。
3.1、生产者代码
代码语言:javascript复制package com.example.rabbitmq.confirm;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ProducerConfirm
{
private static final String exchangeName="saas";
private static final String routyKey="saas";
public static void main(String[] args) throws Exception
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.158.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//指定消息的投递模式:消息的确认模式
channel.confirmSelect();
//通过channel来发送具体的数据信息
String msg = "producer send message";
channel.basicPublish(exchangeName, routyKey, null, msg.getBytes());
//添加一个消息确认监听
channel.addConfirmListener(new ConfirmListener()
{
//消息成功的返回信息
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException
{
System.out.println("message send success!");
}
//消息失败的返回机制
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException
{
System.out.println("message send failed!");
}
});
}
}
在如上的代码中,首先在channel中指定了消息确认,也就是channel.confirmSelect(),然后也添加了生产者端的应答监听机制,如果是成功,就会调用handleAck,如果是失败就会调用handleNack的方法。
3.2、消费者代码
代码语言:javascript复制package com.example.rabbitmq.confirm;
import com.rabbitmq.client.*;
public class ConsumerConfirm
{
private static final String EXCHANGE = "saas";
private static final String queueName="saas";
private static final String routingKey="saas";
public static void main(String[] args) throws Exception
{
try{
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("101.43.158.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
Connection connection=connectionFactory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,EXCHANGE,routingKey);
DefaultConsumer consumer=new DefaultConsumer(channel)
{
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte [] body) throws java.io.IOException
{
String message=new String(body);
System.out.println("receive message is :" message);
};
};
channel.basicConsume(queueName,consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
3.3、案例演示
在生产者端以及消费者端的代码中,可以很清晰的看到它的Exchange和routingKey都是一一对应的,那么也就是说生产者发送的消息到Exchange,然后在Exchange这层它的routingKey与Queue都是对应的,那么发送的消息是能够接收成功的。如下图显示的是生产者以及消费端的消息,具体如下:
当然在生产端开启消息的确认保障机制后,生产者就不能关闭它的连接数和channel,如果关闭的话就无法达到轮训监听确认消息的机制。