点击“蓝字”关注我们吧
前言
上文我们了解了 RabbitMQ 六种队列模式中的简单队列,代码也是非常的简单,比较容易理解。
但是简单队列有个缺点,简单队列是一一对应的关系,即点对点,一个生产者对应一个消费者,按照这个逻辑,如果我们有一些比较耗时的任务,也就意味着需要大量的时间才能处理完毕,显然简单队列模式并不能满足我们的工作需求,我们今天再来看看工作队列。
工作队列模式
什么是工作队列模式
工作队列:用来将耗时的任务分发给多个消费者(工作者)
主要解决问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。
工作队列也称为公平性队列模式,怎么个说法呢?
循环分发,假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消
费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询。
功能介绍
功能描述:一个生产者 "P" 发送消息到 "Q" 队列 由多个消费者 "C" 轮询接受消息(注意一条消息只能被消费一次,不能被重复消费)
P:生产者、红色:队列(可以缓存消息)、C:消费者
代码演示
本文是基于SpringBoot框架去集成的RabbitMQ,所以最好会SpringBoot基础,再跟着本文一起大家工作队列Demo
创建一个简单的maven项目
导入依赖
首先在我的父工程 pom.xml 导入maven依赖
代码语言:javascript复制<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version></parent><dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> </dependency></dependencies>
生产者
生产者项目结构
pom文件
代码语言:javascript复制<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency>
yml文件
代码语言:javascript复制server: port: 8081spring: rabbitmq: ####连接地址 host: 192.168.137.5 ####端口号 port: 5672 ####账号 username: sunny ####密码 password: sunny ### 交换机 virtual-host: /sunny_vm
生产者配置类
代码语言:javascript复制@Componentpublic class RabbitMqConfig { /** * 队列名称 */ public static final String QUEUE_NAME = "sunny_work_queue"; @Bean public Queue simpleQueu() { return new Queue(QUEUE_NAME); }}
生产者发送消息
代码语言:javascript复制@RestControllerpublic class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public void send() { for (int i = 0; i < 50; i ) { String message = "sunny发送爱心小礼包工作消息" i; rabbitTemplate.convertAndSend(QUEUE_NAME, message); System.out.println("生产者发送消息:" message "成功"); } }}
和简单队列一样,在工作队列中,发送消息的时候不指定交换机的名称,那么就会将消息发送到"默认交换机"上。默认的Exchange不进行Binding操作,任何发送到该Exchange的消息都会被转发到"Queue名字和Routing key相同的队列"中,如果vhost中不存在和Routing key同名的队列,则该消息会被抛弃。
这里我们在发送消息的时候设置的Routing key为"sunny_work_queue",那么就会发送到队列名为"sunny_work_queue"的队列上去。
生产者测试发送消息
打开浏览器,访问指定网址
代码语言:javascript复制http://localhost:8081/send
登陆Mangerment界面,可以看到队列中阻塞了50条消息未消费
消费者
消费者项目结构
yml文件
代码语言:javascript复制server: port: 8080 spring: rabbitmq: ####连接地址 host: 192.168.137.5 ####端口号 port: 5672 ####账号 username: sunny ####密码 password: sunny ### 交换机 virtual-host: /sunny_vm
新建2个消费者,监控的是同一个队列"sunny_simple_queue"
代码语言:javascript复制@Component@RabbitListener(queues = "sunny_work_queue")public class ConsumerOneController { /** * @RabbitListener 和 @RabbitHandler 搭配使用 * 可以标注在类上面,需配合 @RabbitHandler 注解一起使用 * 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理, * 具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型 **/ @RabbitHandler public void accept(String message) { System.out.println("消费者One接受消息:" message "成功"); }}@Component@RabbitListener(queues = "sunny_work_queue")public class ConsumerTwoController { /** * @RabbitListener 和 @RabbitHandler 搭配使用 * 可以标注在类上面,需配合 @RabbitHandler 注解一起使用 * 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理, * 具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型 **/ @RabbitHandler public void accept(String message) { System.out.println("消费者Two接受消息:" message "成功"); }}
启动消费者项目,项目启动后会自动消费消息
队列中积压的消息被成功消费
到此SpringBoot整合RabbitMQ实现工作队列代码Demo就结束拉
总结
轮询分发: 多个消费者监听了同一个队列,队列的数据会循环的给消费者分发数据 适用场景: 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
●RabbitMQ六种队列模式之简单队列模式
●Redis主从架构的搭建
●深入理解Redis的持久化机制
●Spring5.0源码深度解析之Spring是如何利用三级缓存解决循环依赖的问题