RabbitMQ六种队列模式之主题模式

2021-03-08 12:48:07 浏览数 (1)

从前面几篇文章介绍了Exchange模式从fanout 到 direct 的转变过程,在fanout时,我们只能进行简单的广播,对应类型比较单一,使用direct后,消费者则可以进行一定程度的选择,但是direct 还是有局限性,路由不支持多个条件,简单理解就是direct交换机一旦与队列进行绑定那么就绑定了,无法像topic模式这么灵活,既然说到topic绑定灵活,我们接下来就来看看topic是怎么个灵活法。

发布订阅模式

什么是主题模式

主题模式与路由键模式类似,都是可以根据 RoutingKey把消息路由到不同的队列中,只不过主题模式的交换机可以让队列在绑定RoutingKey的时候使用通配符,前面我们所了解到的RoutingKey一般都是由一个或多个单词组成,单词之间以(符号点)进行分割,例如:“sunny.topic.weather”

Topic模式的通配符是怎么回事呢?

Topic通配符模式,其实也可以称之为模糊匹配路由键模式,类似于SQL中的 "=" 和 "like" 的区别,那么通配符的规则分为两种 "*" 和 "#"

  • "*": *号代表只能匹配任意一个单词,例如:sunny.* 能够匹配到 sunny.topic
  • "#":#号代表可以匹配一个或多个单词,例如:sunny.# 能够匹配到 sunny.topic.weather

功能介绍

如上图举例说明:

在这个例子中,我们将发送描述动物的消息。这些消息的路由关键字由三个单词(两个点)组成。路由关键字的第一个单词描述速度,第二个描述颜色,第三个描述物种

建立三个绑定: Q1绑定关键字 "*.orange." ,Q2绑定关键字 "*.*.rabbit" 和 "lazy.#"

这些绑定可以概括为:

  • Q1对所有橙色的动物感兴趣。
  • Q2希望接收所有关于兔子和一切有关懒惰的动物的消息。

现在发送几个不同的routing_key消息:

(1)“quick.orange.rabbit”的消息将被传递到两个队列。

(2)“lazy.orange.elephant”消息也被发送到两个队列。

(3)“quick.orange.fox”消息只会到第一队列,

(4)“lazy.brown.fox”消息只会到第二个。

(5)“lazy.pink.rabbit”消息只会被发送给第二队列一次,即使它匹配两个绑定。

(6)“quick.brown.fox”消息不匹配任何绑定,所以它会被丢弃。

(7)“orange”或“quick.orange.male.rabbit”,这些消息将不会匹配任何绑定,将会被丢弃。

(8)“lazy.orange.male.rabbit”消息即使有四个词,但匹配最后一个绑定,将被发送到

第二个队列(因为第一个绑定使用的都是*,只能匹配一个完整的单词,而第二个绑定后面是#,可以匹配零个或多个完整的单词).

当队列以‘#’作为绑定关键字,该队列将会接收所有的消息,和fanout一样,忽略消息的绑定关键字。

当绑定关键字中没有‘*’和‘#’时,topic类型交换机就和direct类型交换机一样。

代码演示

本文是基于SpringBoot框架去集成的RabbitMQ,所以最好会SpringBoot基础,再跟着本文一起搭建主题队列Demo

创建一个简单的maven项目

导入依赖

首先在我的父工程 pom.xml 导入maven依赖

代码语言:javascript复制
<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.0.0.RELEASE</version></parent><dependencies>     <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-amqp</artifactId>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <version>1.18.8</version>    </dependency></dependencies>

生产者

生产者项目结构

pom文件

代码语言:javascript复制
<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-web</artifactId></dependency>

yml文件

代码语言:javascript复制
server:  port: 8081spring:  rabbitmq:    ####连接地址    host: 192.168.137.5    ####端口号    port: 5672    ####账号    username: sunny    ####密码    password: sunny    ### 交换机    virtual-host: /sunny_vm

生产者配置类

代码语言:javascript复制
@Configurationpublic class RabbbitMqConfig {    public static final String EXCHANGE_NAME = "sunny_topic_exchange";    /**     * SMS队列名称     */    public static final String SMS_QUEUE_NAME = "sms_topic_queue";    /**     * email队列名称     */    public static final String EMAIL_QUEUE_NAME = "email_topic_queue";    /**     *  #:匹配sunny.sms 后面所有的配置     */    public static final String SMS_ROUTING_KEY = "sunny.sms.#";    /**     *     * *:匹配sunny.email.任何值都可以查询出来(只占一个位置)  如果是sunny.topic.a.a.com 中间出现了两个则无法进行匹配了     * 例:sunny.email.163.com 或 sunny.email.qq.com     */    public static final String EMAIL_ROUTING_KEY = "sunny.email.*.com";    /**     * 声明短信队列     * @return     */    @Bean    public Queue smsQueue() {        return new Queue(SMS_QUEUE_NAME);    }    /**     * 声明email队列     * @return     */    @Bean    public Queue emailQueue() {        return new Queue(EMAIL_QUEUE_NAME);    }    /**     * 声明一个Topic类型的交换机     * @return     */    @Bean    public TopicExchange topicExchange() {        return new TopicExchange(EXCHANGE_NAME);    }    /**     * 将上面的sms队列绑定到Topic交换机     * @param smsQueue     * @param topicExchange     * @return     */    @Bean    public Binding smsTopicExchangeBinding(Queue smsQueue, TopicExchange topicExchange) {        return BindingBuilder.bind(smsQueue).to(topicExchange).with(SMS_ROUTING_KEY);    }    /**     * 将上面的email队列绑定到Direct交换机     * @param emailQueue     * @param topicExchange     * @return     */    @Bean    public Binding emailTopicExchangeBinding(Queue emailQueue, TopicExchange topicExchange) {        return BindingBuilder.bind(emailQueue).to(topicExchange).with(EMAIL_ROUTING_KEY);    }}

生产者发送消息

代码语言:javascript复制
@Slf4j@RestControllerpublic class ProducerController {    @Autowired    private RabbitTemplate rabbitTemplate;    @GetMapping("/send")    public void send() {        //发送消息到邮件队列中        rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.email.163.com", "发送邮件到163邮箱");        rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.email.qq.com", "发送邮件到qq邮箱");        rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.email.@.wb.com", "发送邮件到wb邮箱");        //发送消息到短信队列中        rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.sms.yd.ak", "发送移动短信消息");        rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.sms.lt.ng", "发送联通短信消息");        rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, "sunny.sms.dx.ap", "发送电信短信消息");        log.info("生产者发送消息成功");    }}

生产者测试发送消息

打开浏览器,访问指定网址

代码语言:javascript复制
http://localhost:8081/send

登陆Mangerment界面,可以看到我们在配置文件中配置的交换机名称。

SpringBoot自动在RabbitMQ里面,已经帮我们创建好了,且交换机的类型为topic类型。

我们还可以点击交换机的名称,然后看到交换机绑定队列的关系图等。

然后可以看到,我绑定交换机的两个队列,分别都积压着消息没有被消费掉,但是估计有小伙伴发现到问题了,我生产者前面是分别往两个队列发送了3条消息,rabbitMQ控制台显示我的email的topic队列只有两条消息,这是因为我的有一条消息发送email队列路由key不匹配原因导致的,主要测试主题模式的模糊匹配机制哈。

消费者

消费者项目结构

yml文件

代码语言:javascript复制
server:  port: 8080spring:  rabbitmq:    ####连接地址    host: 192.168.137.5    ####端口号    port: 5672    ####账号    username: sunny    ####密码    password: sunny    ### 交换机    virtual-host: /sunny_vm

新建2个消费者,监听不同的队列

代码语言:javascript复制
@Component@RabbitListener(queues = {"email_topic_queue"})public class EmailConsumerController {    @RabbitHandler    public void one(String message) {        System.out.println("邮件队列接收到消息了!!!"   message);    }}@Component@RabbitListener(queues = {"sms_topic_queue"})public class SmsConsumerController {    @RabbitHandler    public void one(String message) {        System.out.println("短信队列接收到消息了!!!"   message);    }}

启动消费者项目,项目启动后会自动消费消息

队列中积压的消息被成功消费

到此SpringBoot整合RabbitMQ实现主题模式代码Demo就结束拉

总结

1、Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。 2、需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。 3、"#”表示0个或若干个关键字,“*”表示一个关键字。如“sunny.*”能与“sunny.sms”匹配,无法与“sunny.sms.yd”匹配;但是“sunny.#”能与上述两者匹配。 4、如果 exchange 没有发现能够与 routeKey 匹配的 Queue,则会抛弃此消息。我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。

0 人点赞