点击“蓝字”关注我们吧
前言
在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者。本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式。
发布订阅模式
什么是发布订阅模式
在前两章节,我们往队列中发布消息或获取消息,然而,前面的讲解其实并不完整,接下来,是时候介绍完整的RabbitMq消息模型了。
回忆一下我们前两章指南中包含的内容:
- 一个生产者用以发送消息;
- 一个队列缓存消息;
- 一个消费者用以消费队列中的消息。
RabbitMq消息模式的核心思想是:一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。
在订阅模式中,多了一个Exchange角色,而且过程略有变化,Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
功能介绍
功能描述:一个生产者 "P" 发送消息到 "X" 交换机,然后交换机将消息转发给绑定自己的队列中,最后一个队列可以有多个消费者消费消息,每个消息只能被消费一次。
P:生产者、X:交换机、红色:队列(可以缓存消息)、C:消费者
fanout交换机
订阅模式、路由模式、主题模式,它们三者的队列结构是一模一样的,区别就在于"交换机类型的不同",交换机的类型决定了工作模式的特点。订阅模式的交换机类型是fanout,路由模式的交换机类型是direct,主题模式的交换机类型是topic,所以学习RabbitMQ的各种工作模式,掌握各类型交换机的工作特点很重要。而fanout 交换机的特点就是跟广播一样,对消息不做任何选择地分发给所有绑定的队列。
代码演示
本文是基于SpringBoot框架去集成的RabbitMQ,所以最好会SpringBoot基础,再跟着本文一起搭建发布订阅模式队列Demo
创建一个简单的maven项目
导入依赖
首先在我的父工程 pom.xml 导入maven依赖
代码语言:javascript复制<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version></parent><dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> </dependency></dependencies>
生产者
生产者项目结构
pom文件
代码语言:javascript复制<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency>yml文件server: port: 8081spring: rabbitmq: ####连接地址 host: 192.168.137.5 ####端口号 port: 5672 ####账号 username: sunny ####密码 password: sunny ### 交换机 virtual-host: /sunny_vm
生产者配置类
代码语言:javascript复制@Configurationpublic class RabbitMqConfig { /** * 队列名称 */ public static final String BAIDU_SMS_QUEUE_NAME = "baidu_fanout_weather_queue"; public static final String SINA_QUEUE_NAME = "sina_fanout_weather_queue"; public static final String FANOUT_EXCHANGE = "sunny_exchange"; /** * 定义一个haiduQueue的队列 * Queue 可以有4个参数 * 1.队列名 * 2.durable 持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true * 3.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false * 4.exclusive 表示该消息队列是否只在当前connection生效,默认是false */ @Bean public Queue haiduQueue() { return new Queue(BAIDU_SMS_QUEUE_NAME); } @Bean public Queue sinaQueue() { return new Queue(SINA_QUEUE_NAME); } /** * 声明一个Fanout类型的交换机 * Fanout类型的交换机会将消息分发到所有的绑定队列,没有RoutingKey的概念 * @return */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } /** * 将上面创建的百度队列绑定到交换机 * @param haiduQueue * @param fanoutExchange * @return */ @Bean public Binding baiduQueueExchange(Queue haiduQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(haiduQueue).to(fanoutExchange); } /** * 将上面创建的新浪队列绑定到交换机 * @param sinaQueue * @param fanoutExchange * @return */ @Bean public Binding sinaQueueExchange(Queue sinaQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(sinaQueue).to(fanoutExchange); }}
生产者发送消息
代码语言:javascript复制@RestControllerpublic class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public void send() { String message = "上海天气预报,今日天气:晴"; rabbitTemplate.convertAndSend(FANOUT_EXCHANGE, null, message); System.out.println("发送成功" message); }}
生产者测试发送消息
打开浏览器,访问指定网址
代码语言:javascript复制http://localhost:8081/send
登陆Mangerment界面,可以看到我们在配置文件中配置的交换机名称,SpringBoot自动在RabbitMQ里面,已经帮我们创建好了,且交换机的类型为fanout类型。
我们还可以点击交换机的名称,然后看到交换机绑定队列的关系图等。
然后可以看到,我绑定交换机的两个队列,分别都积压着一条消息没有被消费掉
消费者
消费者项目结构
yml文件
代码语言:javascript复制server: port: 8080 spring: rabbitmq: ####连接地址 host: 192.168.137.5 ####端口号 port: 5672 ####账号 username: sunny ####密码 password: sunny ### 交换机 virtual-host: /sunny_vm
新建2个消费者,监听不同的队列
代码语言:javascript复制@Component@RabbitListener(queues = "baidu_fanout_weather_queue")public class BaiduConsumerController { /** * @RabbitListener 和 @RabbitHandler 搭配使用 * 可以标注在类上面,需配合 @RabbitHandler 注解一起使用 * 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理, * 具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型 **/ @RabbitHandler public void accept(String message) { System.out.println("百度天气预报接受消息:" message "成功"); }}@Component@RabbitListener(queues = "sina_fanout_weather_queue")public class SinaConsumerController { /** * @RabbitListener 和 @RabbitHandler 搭配使用 * 可以标注在类上面,需配合 @RabbitHandler 注解一起使用 * 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理, * 具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型 **/ @RabbitHandler public void accept(String message) { System.out.println("新浪天气预报接受消息:" message "成功"); }}
启动消费者项目,项目启动后会自动消费消息
队列中积压的消息被成功消费
到此SpringBoot整合RabbitMQ实现发布订阅模式Demo就结束拉
总结
1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。 2. 发布订阅模式与工作队列模式的区别: a:工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机 b:发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机) c:发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 适用场景: 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文
●RabbitMQ六种队列模式之工作队列模式
●RabbitMQ六种队列模式之简单队列模式
●深入理解Redis的持久化机制
●Redis集群搭建及原理解剖
●我们所了解的Redis分布式锁真的就万无一失吗?
●Spring5.0源码深度解析之Spring是如何利用三级缓存解决循环依赖的问题