从前面几篇文章介绍了Exchange模式从fanout 到 direct 的转变过程,在fanout时,我们只能进行简单的广播,对应类型比较单一,使用direct后,消费者则可以进行一定程度的选择,但是direct 还是有局限性,路由不支持多个条件,简单理解就是direct交换机一旦与队列进行绑定那么就绑定了,无法像topic模式这么灵活,既然说到topic绑定灵活,我们接下来就来看看topic是怎么个灵活法。
发布订阅模式
什么是主题模式
主题模式与路由键模式类似,都是可以根据 RoutingKey把消息路由到不同的队列中,只不过主题模式的交换机可以让队列在绑定RoutingKey的时候使用通配符,前面我们所了解到的RoutingKey一般都是由一个或多个单词组成,单词之间以(符号点)进行分割,例如:“sunny.topic.weather”
Topic模式的通配符是怎么回事呢?
Topic通配符模式,其实也可以称之为模糊匹配路由键模式,类似于SQL中的 "=" 和 "like" 的区别,那么通配符的规则分为两种 "*" 和 "#"
- "*": *号代表只能匹配任意一个单词,例如:sunny.* 能够匹配到 sunny.topic
- "#":#号代表可以匹配一个或多个单词,例如:sunny.# 能够匹配到 sunny.topic.weather
功能介绍
如上图举例说明:
在这个例子中,我们将发送描述动物的消息。这些消息的路由关键字由三个单词(两个点)组成。路由关键字的第一个单词描述速度,第二个描述颜色,第三个描述物种
建立三个绑定: Q1绑定关键字 "*.orange." ,Q2绑定关键字 "*.*.rabbit" 和 "lazy.#"
这些绑定可以概括为:
- Q1对所有橙色的动物感兴趣。
- Q2希望接收所有关于兔子和一切有关懒惰的动物的消息。
现在发送几个不同的routing_key消息:
(1)“quick.orange.rabbit”的消息将被传递到两个队列。
(2)“lazy.orange.elephant”消息也被发送到两个队列。
(3)“quick.orange.fox”消息只会到第一队列,
(4)“lazy.brown.fox”消息只会到第二个。
(5)“lazy.pink.rabbit”消息只会被发送给第二队列一次,即使它匹配两个绑定。
(6)“quick.brown.fox”消息不匹配任何绑定,所以它会被丢弃。
(7)“orange”或“quick.orange.male.rabbit”,这些消息将不会匹配任何绑定,将会被丢弃。
(8)“lazy.orange.male.rabbit”消息即使有四个词,但匹配最后一个绑定,将被发送到
第二个队列(因为第一个绑定使用的都是*,只能匹配一个完整的单词,而第二个绑定后面是#,可以匹配零个或多个完整的单词).
当队列以‘#’作为绑定关键字,该队列将会接收所有的消息,和fanout一样,忽略消息的绑定关键字。
当绑定关键字中没有‘*’和‘#’时,topic类型交换机就和direct类型交换机一样。
代码演示
本文是基于SpringBoot框架去集成的RabbitMQ,所以最好会SpringBoot基础,再跟着本文一起搭建主题队列Demo
创建一个简单的maven项目
导入依赖
首先在我的父工程 pom.xml 导入maven依赖
代码语言:javascript复制<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version></parent><dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> </dependency></dependencies>
生产者
生产者项目结构
pom文件
代码语言:javascript复制<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency>
yml文件
server: port: 8081spring: rabbitmq: ####连接地址 host: 192.168.137.5 ####端口号 port: 5672 ####账号 username: sunny ####密码 password: sunny ### 交换机 virtual-host: /sunny_vm
生产者配置类
@Configurationpublic class RabbbitMqConfig { public static final String EXCHANGE_NAME = "sunny_topic_exchange"; /** * SMS队列名称 */ public static final String SMS_QUEUE_NAME = "sms_topic_queue"; /** * email队列名称 */ public static final String EMAIL_QUEUE_NAME = "email_topic_queue"; /** * #:匹配sunny.sms 后面所有的配置 */ public static final String SMS_ROUTING_KEY = "sunny.sms.#"; /** * * *:匹配sunny.email.任何值都可以查询出来(只占一个位置) 如果是sunny.topic.a.a.com 中间出现了两个则无法进行匹配了 * 例:sunny.email.163.com 或 sunny.email.qq.com */ public static final String EMAIL_ROUTING_KEY = "sunny.email.*.com"; /** * 声明短信队列 * @return */ @Bean public Queue smsQueue() { return new Queue(SMS_QUEUE_NAME); } /** * 声明email队列 * @return */ @Bean public Queue emailQueue() { return new Queue(EMAIL_QUEUE_NAME); } /** * 声明一个Topic类型的交换机 * @return */ @Bean public TopicExchange topicExchange() { return new TopicExchange(EXCHANGE_NAME); } /** * 将上面的sms队列绑定到Topic交换机 * @param smsQueue * @param topicExchange * @return */ @Bean public Binding smsTopicExchangeBinding(Queue smsQueue, TopicExchange topicExchange) { return BindingBuilder.bind(smsQueue).to(topicExchange).with(SMS_ROUTING_KEY); } /** * 将上面的email队列绑定到Direct交换机 * @param emailQueue * @param topicExchange * @return */ @Bean public Binding emailTopicExchangeBinding(Queue emailQueue, TopicExchange topicExchange) { return BindingBuilder.bind(emailQueue).to(topicExchange).with(EMAIL_ROUTING_KEY); }}
生产者发送消息
@Slf4j@RestControllerpublic class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public void send() { //发送消息到邮件队列中 rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.email.163.com", "发送邮件到163邮箱"); rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.email.qq.com", "发送邮件到qq邮箱"); rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.email.@.wb.com", "发送邮件到wb邮箱"); //发送消息到短信队列中 rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.sms.yd.ak", "发送移动短信消息"); rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.sms.lt.ng", "发送联通短信消息"); rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.sms.dx.ap", "发送电信短信消息"); log.info("生产者发送消息成功"); }}
生产者测试发送消息
打开浏览器,访问指定网址
http://localhost:8081/send
登陆Mangerment界面,可以看到我们在配置文件中配置的交换机名称。
SpringBoot自动在RabbitMQ里面,已经帮我们创建好了,且交换机的类型为topic类型。
我们还可以点击交换机的名称,然后看到交换机绑定队列的关系图等。
然后可以看到,我绑定交换机的两个队列,分别都积压着消息没有被消费掉,但是估计有小伙伴发现到问题了,我生产者前面是分别往两个队列发送了3条消息,rabbitMQ控制台显示我的email的topic队列只有两条消息,这是因为我的有一条消息发送email队列路由key不匹配原因导致的,主要测试主题模式的模糊匹配机制哈。
消费者
消费者项目结构
yml文件
代码语言:javascript复制server: port: 8080spring: rabbitmq: ####连接地址 host: 192.168.137.5 ####端口号 port: 5672 ####账号 username: sunny ####密码 password: sunny ### 交换机 virtual-host: /sunny_vm
新建2个消费者,监听不同的队列
@Component@RabbitListener(queues = {"email_topic_queue"})public class EmailConsumerController { @RabbitHandler public void one(String message) { System.out.println("邮件队列接收到消息了!!!" message); }}@Component@RabbitListener(queues = {"sms_topic_queue"})public class SmsConsumerController { @RabbitHandler public void one(String message) { System.out.println("短信队列接收到消息了!!!" message); }}
启动消费者项目,项目启动后会自动消费消息
队列中积压的消息被成功消费
到此SpringBoot整合RabbitMQ实现主题模式代码Demo就结束拉
总结
1、Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。 2、需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。 3、"#”表示0个或若干个关键字,“*”表示一个关键字。如“sunny.*”能与“sunny.sms”匹配,无法与“sunny.sms.yd”匹配;但是“sunny.#”能与上述两者匹配。 4、如果 exchange 没有发现能够与 routeKey 匹配的 Queue,则会抛弃此消息。我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。