前言
假设我们现在有这么一个场景,我们的消费端由于某些原因导致全部宕机等不可用,导致RabbitMQ服务器队列中存储了大量消息未被消费掉,那么等恢复消费端服务器后,就会有巨大量的消息全部推送过来,但是我们单个客户端无法同事处理这么多消息,就是导致消费端一些不可预测错误,甚至又会重复发生宕机,所以在实际业务场景中,限流保护还是非常重要的。
消费端限流
什么是消费端限流
rabbitMQ 提供了一种 qos (服务质量保证)功能,规定消费端每次只能接收多少条消息,消费端在没有将接收到的消息全部确认之前,是不会在进行接收消息的。
代码演示
本文是基于SpringBoot框架去集成的RabbitMQ,所以最好会SpringBoot基础,再跟着本文一起搭建消费端限流Demo
创建一个简单的maven项目
导入依赖
首先在我的父工程 pom.xml 导入maven依赖
代码语言:javascript复制<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version></parent>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> </dependency></dependencies>
生产者
生产者项目结构
pom文件
代码语言:javascript复制<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency>
yml文件
代码语言:javascript复制server: port: 8081
spring: rabbitmq: ####连接地址 host: 192.168.137.5 ####端口号 port: 5672 ####账号 username: sunny ####密码 password: sunny ### 交换机 virtual-host: /sunny_vm # publisher-confirms和publisher-returns是对于消息生产端的配置 publisher-confirms: true # 开启发送消息确认 对应RabbitTemplate.ConfirmCallback接口 publisher-returns: true # 开启发送消息失败返回 对应RabbitTemplate.ReturnCallback接口
生产者配置类
@Configurationpublic class RabbbitMqConfig {
public static final String EXCHANGE_NAME = "sunny_prefetch_exchange";
/** * prefetch队列名称 */ public static final String PREFETCH_QUEUE_NAME = "routing_prefetch_queue";
/** * prefetch 路由key名称 */ public static final String PREFETCH_ROUTING_KEY = "routing.prefetch.key";
/** * 声明prefetch队列 * * @return */ @Bean public Queue prefetchQueue() { return new Queue(PREFETCH_QUEUE_NAME, true); }
/** * 声明一个Direct类型的交换机 * * @return */ @Bean public DirectExchange directExchange() { return new DirectExchange(EXCHANGE_NAME); }
/** * 将上面的prefetch队列绑定到Direct交换机 * * @param prefetchQueue * @param directExchange * @return */ @Bean public Binding configQueueDirectExchange(Queue prefetchQueue, DirectExchange directExchange) { return BindingBuilder.bind(prefetchQueue).to(directExchange).with(PREFETCH_ROUTING_KEY); }}
RabbitMQACK配置文件
代码语言:javascript复制/** * @Description 消息发送确认 * <p> * ConfirmCallback 只确认消息是否正确到达 Exchange 中 * ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行 * <p> * 1. 如果消息没有到exchange,则confirm回调,ack=false * 2. 如果消息到达exchange,则confirm回调,ack=true * 3. exchange到queue成功,则不回调return * 4. exchange到queue失败,则回调return * */@Slf4j@Configurationpublic class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); // 指定 ConfirmCallback rabbitTemplate.setReturnCallback(this); // 指定 ReturnCallback }
/** * 如果消息到达 exchange, 则 confirm 回调, ack = true * 如果消息不到达 exchange, 则 confirm 回调, ack = false * 需要设置spring.rabbitmq.publisher-confirms=true * * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息是否到达Exchange:{}", ack == true ? "消息成功到达Exchange" : "消息到达Exchange失败"); if (!ack) { log.info("消息到达Exchange失败原因:{}", cause);
// 根据业务逻辑实现消息补偿机制
} }
/** * exchange 到达 queue, 则 returnedMessage 不回调 * exchange 到达 queue 失败, 则 returnedMessage 回调 * 需要设置spring.rabbitmq.publisher-returns=true * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息报文:{}", new String(message.getBody())); log.info("消息编号:{}", replyCode); log.info("描述:{}", replyText); log.info("交换机名称:{}", exchange); log.info("路由名称:{}", routingKey);
// 根据业务逻辑实现消息补偿机制
}}
生产者发送消息
代码语言:javascript复制@Slf4j@RestControllerpublic class ProducerController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/send") public void send() { for (int i = 0; i < 9; i ) { String msg = "发送第" i "消息"; rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, RabbbitMqConfig.PREFETCH_ROUTING_KEY, msg); log.info("发送下标消息{}", i); } }}
生产者测试发送消息
打开浏览器,访问指定网址
http://localhost:8081/send
登陆Mangerment界面,可以看到我们在配置文件中配置的交换机名称
SpringBoot自动在RabbitMQ里面,已经帮我们创建好了,且交换机的类型为direct类型。
我们还可以点击交换机的名称,然后看到交换机绑定队列的关系图等。
然后可以看到,我绑定交换机的队列,积存着9条消息
消费者
消费者项目结构
pom文件
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId></dependency>
yml文件
server: port: 8080
spring: rabbitmq: ####连接地址 host: 192.168.137.5 ####端口号 port: 5672 ####账号 username: sunny ####密码 password: sunny ### 交换机 virtual-host: /sunny_vm #这个配置是针对消息消费端的配置 listener: simple: acknowledge-mode: manual # 开启 ack 手动确认 prefetch: 3 #消费端限流/一次就接收三条,需等待3条消费完毕,才继续接收消息
新建消费者,监听的队列
代码语言:javascript复制@Slf4j@Componentpublic class ConsumerController {
@RabbitHandler @RabbitListener(queues = {"routing_prefetch_queue"}) public void consumer(Channel channel, Message message) throws IOException, InterruptedException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("时间:{},成功接收消息:{}", System.currentTimeMillis(), deliveryTag);
if (deliveryTag % 3 == 0) {
//第二个参数--- true:批量接收数据,false:逐条接收数据 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
//模拟业务限流,接收消息等于3条消息,则阻塞3秒种,再次进行接收消息 Thread.sleep(3000); } }}
启动消费者项目,项目启动后会自动消费消息
队列中积压的消息被成功消费
到此SpringBoot整合RabbitMQ实现消费端限流Demo就结束拉
总结
1.为了防止消费端某时刻同时接收大量的消息导致不可预测情况发生,我们可以在消费端添加限流处理,每次限制接收多少条消息。