接着上篇分布式--RabbitMQ入门
一、SpringBoot中使用RabbitMQ
1. 导入依赖
代码语言:javascript复制 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. yml配置
代码语言:javascript复制spring:
rabbitmq:
host: 192.168.42.4
port: 5672
username: aruba
password: aruba
virtual-host: /
listener:
direct:
acknowledge-mode: manual # 手动ack
simple:
prefetch: 1 # 流控
concurrency: 10 # 多线程监控
3. 配置交换机和队列
代码语言:javascript复制@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "MY-MQ-EX";
public static final String QUEUE_NAME = "MY-MQ-QUEUE";
public static final String ROUTING_KEY = "key.#";
/**
* 注入交换机
*
* @return
*/
@Bean
public Exchange exchangeProvider() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
/**
* 注入队列
*
* @return
*/
@Bean
public Queue queueProvider() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 注入交换机队列绑定关系
*
* @return
*/
@Bean
public Binding bootBinding(Exchange exchangeProvider, Queue queueProvider) {
return BindingBuilder.bind(queueProvider).to(exchangeProvider).with(ROUTING_KEY).noargs();
}
}
4. 发送消息
SpringBoot中使用RabbitTemplate
自动注入,即可发送消息,并对方法都进行了封装
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Autowired
public RabbitTemplate rabbitTemplate;
@Test
void send() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "key.send", "发送消息");
}
/**
* 携带信息的消息
*/
@Test
void sendWithProps() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
"key.send", "发送消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
}
});
}
}
5. 订阅消息
在方法上使用@RabbitListener
注解,即可指定订阅队列。
入参添加Channel
,就可以和之前一样发送ack
。
将消息封装成了Message
,可以获取其携带信息。
@Component
public class MQListener {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void consume(String msg, Channel channel, Message message) throws IOException {
System.out.println("队列的消息为:" msg);
String correlationId = message.getMessageProperties().getCorrelationId();
System.out.println("唯一标识为:" correlationId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
运行结果:
二、消息可靠性
由于RabbitMQ在发送消息和订阅消息时,都是通过网络传输,其间必然会出现由网络问题产生的消息丢失情况,要保证消息的可靠性从下面四点出发:
- 保证消息发送到交换机
- 保证消息路由到队列
- 保证队列中消息的持久化
- 保证消费者正常消费消息
1. Client-API方式
1.1 保证消息发送到交换机
Publisher Confirms就是为了保证消息发送到交换机的机制,一般使用异步的方式:
代码语言:javascript复制 //4. 开启confirm
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息成功发送到交换机");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("没有送达交换机");
}
});
1.2 保证消息路由到队列
addReturnListener
方法可以确认消息是否路由到了队列,如果回调了说明没有路由到队列
发送消息时,指定mandatory
参数为true
//5. 设置return回调,确认消息是否路由到了队列
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("交换机没有路由到队列");
}
});
//参数: 交换机 routing-Key mandatory 消息其他参数 消息
channel.basicPublish("", QUEUE_NAME, true, null, message.getBytes());
1.3 保证队列中消息的持久化
首先保证队列的持久化,再保证消息的持久化
代码语言:javascript复制 //3. 构建队列 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//6. 发送消息
String message = "hello confirm";
AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
.deliveryMode(2) //2:消息持久化 1: 不持久化
.build();
//参数: 交换机 routing-Key mandatory 消息其他参数 消息
channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
1.4 保证消费者正常消费消息
保证消费者正常消费消息只需要手动ack
即可,生产者完整代码:
public class Publisher {
private static final String QUEUE_NAME = "confirm";
@Test
public void publisher() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 创建信道
Channel channel = connection.createChannel();
//3. 构建队列 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//4. 开启confirm
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息成功发送到交换机");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("没有送达交换机");
}
});
//5. 设置return回调,确认消息是否路由到了队列
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("交换机没有路由到队列");
}
});
//6. 发送消息
String message = "hello confirm";
AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
.deliveryMode(2) //2:消息持久化 1: 不持久化
.build();
//参数: 交换机 routing-Key mandatory 消息其他参数 消息
channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
}
}
2. SpringBoot方式
2.1 配置Confirm
yml中开启confirm
:
spring:
rabbitmq:
publisher-confirm-type: correlated
RabbitTemplate
设置回调:
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息成功送达到交换机");
} else {
System.out.println("消息没有送达到交换机");
}
}
});
2.2 配置Return
yml中开启return
:
spring:
rabbitmq:
publisher-returns: true
RabbitTemplate
设置回调:
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(String.format("交换机:%s 路由消息失败", returned.getExchange()));
}
});
2.3 消息持久化
设置Message
的携带信息:
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
"key.send", "发送消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
完整代码:
代码语言:javascript复制 /**
* 携带信息的消息
*/
@Test
void sendWithProps() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息成功送达到交换机");
} else {
System.out.println("消息没有送达到交换机");
}
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(String.format("交换机:%s 路由消息失败", returned.getExchange()));
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
"key.send", "发送消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
}
});
}
三、死信队列
死信队列是存放本来应该死亡的消息的队列,用于对这些消息的特殊处理(如:重新入队、持久化到数据库),具体有以下几种消息会被存放进死信队列:
- 消费者拒绝的消息,并requeue设置为false(不重新入队列)
- 消息的生存时间到了,还在队列中的信息
- 队列设置了整体的消息生存时间,到了生存时间的消息
- 到达队列中消息最大数,再路由过来的消息
1. 构建交换机
死信队列需要一个死信交换机,并把正常消息的队列绑定死信交换机:
代码语言:javascript复制@Configuration
public class DeadLetterConfig {
public static final String NORMAL_EXCHANGE_NAME = "normal-ex";
public static final String NORMAL_QUEUE_NAME = "normal-queue";
public static final String NORMAL_ROUTING_KEY = "normal.#";
public static final String DEAD_EXCHANGE_NAME = "dead-ex";
public static final String DEAD_QUEUE_NAME = "dead-queue";
public static final String DEAD_ROUTING_KEY = "dead.#";
@Bean
public Exchange normalExchange() {
return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE_NAME).build();
}
@Bean
public Queue normalQueue() {
// 绑定死信交换机
return QueueBuilder.durable(NORMAL_QUEUE_NAME)
.deadLetterExchange(DEAD_EXCHANGE_NAME)
.deadLetterRoutingKey("dead.msg") //准备入死信队列的消息重新设置routin-key
.build();
}
@Bean
public Binding normalBinding(Exchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
}
@Bean
public Exchange deadExchange() {
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
}
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
@Bean
public Binding deadBinding(Exchange deadExchange, Queue deadQueue) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
}
2. 死信队列的实现方式
2.1 拒绝消息入死信队列
对正常队列消息进行监听,来做相应的处理,首先是拒绝消息,并且要把requeue
设为false
:
@Component
public class DeadListener {
@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE_NAME)
public void normalListener(Message msg, Channel channel) throws IOException {
System.out.println("接收到正常队列消息:" new String(msg.getBody()));
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
// channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
}
}
尝试发送一个消息:
代码语言:javascript复制 @Test
public void sendNormal() {
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME, "normal.msg", "哈喽");
}
运行结果:
2.2 消息生存时间
发送消息时,通过消息的额外参数MessageProperties
的setExpiration
方法设置过期时间:
@Test
public void sendExpire() {
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
"normal.msg", "哈喽",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 该消息10s后过期
message.getMessageProperties().setExpiration("10000");
return message;
}
});
}
记得把上面消息的监听注释掉,否则会消费消息
运行结果:
2.3 队列消息的整体生存时间
管理页面把之前的正常队列删除,在重新创建时,为正常队列设置ttl
:
设置ttl
:
@Bean
public Queue normalQueue() {
// 绑定死信交换机
return QueueBuilder.durable(NORMAL_QUEUE_NAME)
.ttl(5000) // 整体消息过期时间为5s
.deadLetterExchange(DEAD_EXCHANGE_NAME)
.deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
.build();
}
发送正常消息,运行结果:
2.4 达到队列最大数
同样先删除正常队列,后调用maxLength
为队列设置最大消息数:
@Bean
public Queue normalQueue() {
// 绑定死信交换机
return QueueBuilder.durable(NORMAL_QUEUE_NAME)
// .ttl(5000) // 整体消息过期时间为5s
.maxLength(1) // 设置消息最大数
.deadLetterExchange(DEAD_EXCHANGE_NAME)
.deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
.build();
}
发送两次正常消息,运行结果:
四、延迟交换机
死信队列的问题:由于死信队列只会监听队列头的过期时间,一旦队列头的消息过期时间比后面排队的消息过期时间长,那么后面消息的过期时间并不会生效,而是等待队列头的过期时间到了后,才一并进入死信队列
删除正常队列,恢复配置:
代码语言:javascript复制 @Bean
public Queue normalQueue() {
// 绑定死信交换机
return QueueBuilder.durable(NORMAL_QUEUE_NAME)
// .ttl(5000) // 整体消息过期时间为5s
// .maxLength(1) // 设置消息最大数
.deadLetterExchange(DEAD_EXCHANGE_NAME)
.deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
.build();
}
发送两次消息,第一次过期时间为30s,第二次为2s:
代码语言:javascript复制 @Test
public void sendExpire30() {
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
"normal.msg", "哈喽",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("30000");
return message;
}
});
}
@Test
public void sendExpire2() {
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
"normal.msg", "哈喽",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("2000");
return message;
}
});
}
结果,过了几秒后,队列中还是两个消息:
解决方法:根据时间创建多个队列或者使用延迟交换机
延迟交换机是一个插件,默认并不带,原理就是将消息暂时放在交换机中,由交换机根据消息过期时间的先后来路由到队列,缺点:由于消息在交换机中,重启会导致消息的丢失
1. 插件下载和使用
根据自己的RabbitMQ版本进行下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/
代码语言:javascript复制wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
mv rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez /usr/local/rabbitmq/rabbitmq_server-3.8.35/plugins
启动插件:
代码语言:javascript复制cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启服务或系统后,多了一个x-delayed-message
的交换机类型:
2. 配置延迟交换机
使用CustomExchange
构造x-delayed-message
类型交换机,并使用其他参数x-delayed-type
指定使用哪种原型交换机类型,这边使用的是topic
:
@Configuration
public class DelayExchangeConfig {
public static final String EXCHANGE_NAME = "delay-exchange";
public static final String DELAY_QUEUE = "delay_queue";
public static final String DELAY_ROUTIN_KEY = "delay.#";
@Bean
public Exchange delayExchange() {
Map<String, Object> args = new HashMap<>();
// 使用哪种原型交换机类型
args.put("x-delayed-type", "topic");
Exchange exchange = new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
return exchange;
}
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE).build();
}
@Bean
public Binding delayBinding(Queue delayQueue, Exchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTIN_KEY).noargs();
}
}
3. 发送消息
MessageProperties
使用setDelay
方法为消息设置延迟:
@Test
public void sendDelay30() {
rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
"delay.msg", "哈喽",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(30000);
return message;
}
});
}
@Test
public void sendDelay5() {
rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
"delay.msg", "哈喽",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
消息在交换机进行等待后,首先入队列的为5秒延迟的,后面入队列的为30秒延迟的:
五、集群
1. 配置主机名
RabbitMQ集群的搭建要配置主机名:HOSTNAME
,先修改network
配置文件
vi /etc/sysconfig/network
追加HOSTNAME:
代码语言:javascript复制HOSTNAME=rabbit1
再修改hosts
文件:
vi /etc/hosts
追加内容:
代码语言:javascript复制192.168.42.4 rabbit1
重启系统后,RabbitMQ先前配置的管理账号会丢失,需要重新配置
2. 克隆虚拟机
2.1 从机主机名配置
克隆后,对从机进行主机名的配置,network
配置文件:
hosts
文件,中需要添加集群主节点的ip和hostname:
2.2 建立集群关联
启动RabbitMQ服务后,管理界面的节点会带上主机名:
接下来,配置从机加入到主节点集群中,执行以下命令即可:
代码语言:javascript复制cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin/
./rabbitmqctl stop_app
./rabbitmqctl reset
./rabbitmqctl join_cluster rabbit@rabbit1
./rabbitmqctl start_app
加入成功后,管理界面中就会出现多个节点:
3. 配置镜像模式
目前集群是普通模式,队列中的消息只会存在于一个节点上,而不会同步到其他队列,一旦该节点宕机,其他节点将无法访问消息。
镜像模式是指,集群中所有节点都有一份单独的拷贝,即使单一节点宕机,其他节点中依然存在消息的拷贝,这样才能实现高可用
在管理界面进行配置镜像策略:
新建一个队列,并查看详情:
项目地址:
https://gitee.com/aruba/rabbit-mqstudy.git