RabbitMQ六种队列模式之路由模式

2021-03-09 12:05:57 浏览数 (1)

点击“蓝字”关注我们吧

前言

本文接着带大家伙了解RabbitMQ队列模式中的路由模式,其实只要看过我前面写的发布订阅模式的文章后,相信路由模式上手就非常 easy 了,唯一差距就是两个参数,exchange类型和 routingKey 。

路由模式

什么是路由模式

路由模式跟发布订阅模式类似,然后在订阅模式的基础上加上了类型,订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由键的队列

功能介绍

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key
  • X:Exchange(交换机),接受生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
  • 红色方块:队列,用于存放消息
  • C1:消费者,其所在队列指定了需要 routing key 为 error的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning的消息

Routing 路由模式

路由模式的交换机类型是Direct,Direct交换机的特点,就决定了路由模式的工作模式,即只有消息的 Routing key 与Binding key 相同时,交换机才会把消息发给该队列。

代码演示

本文是基于SpringBoot框架去集成的RabbitMQ,所以最好会SpringBoot基础,再跟着本文一起搭建路由队列Demo

创建一个简单的maven项目

导入依赖

首先在我的父工程 pom.xml 导入maven依赖

代码语言:javascript复制
<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.0.0.RELEASE</version></parent><dependencies>     <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-amqp</artifactId>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <version>1.18.8</version>    </dependency></dependencies>

生产者

生产者项目结构

pom文件

代码语言:javascript复制
<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-web</artifactId></dependency>

yml文件

代码语言:javascript复制
server:  port: 8081spring:  rabbitmq:    ####连接地址    host: 192.168.137.5    ####端口号    port: 5672    ####账号    username: sunny    ####密码    password: sunny    ### 交换机    virtual-host: /sunny_vm

生产者配置类

代码语言:javascript复制
@Configurationpublic class RabbbitMqConfig {

    public static final String EXCHANGE_NAME = "sunny_routingkey_exchange";
    /**     * SMS队列名称     */    public static final String SMS_QUEUE_NAME = "sms_direct_routingkey_queue";
    /**     * email队列名称     */    public static final String EMAIL_QUEUE_NAME = "email_direct_routingkey_queue";

    /**     * sms 路由key名称     */    public static final String SMS_ROUTING_KEY = "sms_routing_key";
    /**     * email 路由key名称     */    public static final String EMAIL_ROUTING_KEY = "email_routing_key";
    /**     * 声明短信队列     *     * @return     */    @Bean    public Queue smsQueue() {        return new Queue(SMS_QUEUE_NAME, true);    }
    /**     * 声明email队列     *     * @return     */    @Bean    public Queue emailQueue() {        return new Queue(EMAIL_QUEUE_NAME, true);    }
    /**     * 声明一个Direct类型的交换机     *     * @return     */    @Bean    public DirectExchange directExchange() {        return new DirectExchange(EXCHANGE_NAME);    }
    /**     * 将上面的sms队列绑定到Direct交换机     *     * @param smsQueue     * @param directExchange     * @return     */    @Bean    public Binding smsQueueDirectExchange(Queue smsQueue, DirectExchange directExchange) {        return BindingBuilder.bind(smsQueue).to(directExchange).with(SMS_ROUTING_KEY);    }
    /**     * 将上面的email队列绑定到Direct交换机     *     * @param emailQueue     * @param directExchange     * @return     */    @Bean    public Binding emailQueueDirectExchange(Queue emailQueue, DirectExchange directExchange) {        return BindingBuilder.bind(emailQueue).to(directExchange).with(EMAIL_ROUTING_KEY);    }}

用户实体类

代码语言:javascript复制
@Datapublic class UserEntity implements Serializable {
    private Integer id;    private String name;}

生产者发送消息

代码语言:javascript复制
@RestControllerpublic class SmsProducerController {
    @Autowired    private RabbitTemplate rabbitTemplate;
    @GetMapping("/send/sms")    public void send() {
        UserEntity userEntity = new UserEntity();        userEntity.setId(new Random().nextInt());        userEntity.setName("短信:"   UUID.randomUUID().toString());        rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, RabbbitMqConfig.SMS_ROUTING_KEY, userEntity);
        System.out.println("短信生产者消息发送成功!!!");    }}
@RestControllerpublic class EmailProducerController {
    @Autowired    private RabbitTemplate rabbitTemplate;
    @GetMapping("/send/email")    public void send() {
        UserEntity userEntity = new UserEntity();        userEntity.setId(new Random().nextInt());        userEntity.setName("邮件:"   UUID.randomUUID().toString());        rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, RabbbitMqConfig.EMAIL_ROUTING_KEY, userEntity);
        System.out.println("邮件生产者消息发送成功!!!");    }}

生产者测试发送消息

打开浏览器,访问指定网址

代码语言:javascript复制
http://localhost:8081/send/smshttp://localhost:8081/send/email

登陆Mangerment界面,可以看到我们在配置文件中配置的交换机名称,

SpringBoot自动在RabbitMQ里面,已经帮我们创建好了,且交换机的类型为direct类型。

我们还可以点击交换机的名称,然后看到交换机绑定队列的关系图等。

然后可以看到,我绑定交换机的两个队列,分别都积压着消息没有被消费掉

消费者

消费者项目结构

yml文件

代码语言:javascript复制
server:  port: 8080
spring:  rabbitmq:    ####连接地址    host: 192.168.137.5    ####端口号    port: 5672    ####账号    username: sunny    ####密码    password: sunny    ### 交换机    virtual-host: /sunny_vm

用户实体类

代码语言:javascript复制
@Datapublic class UserEntity implements Serializable {
    private Integer id;    private String name;}

新建2个消费者,监听不同的队列

代码语言:javascript复制
@Component@RabbitListener(queues = {"email_direct_routingkey_queue"})public class EmailConsumerController {    @RabbitHandler    public void one(UserEntity userEntity, Channel channel, Message message) {        System.out.println("邮件队列接收到消息了!!!"   userEntity.getName());    }}

@Component@RabbitListener(queues = {"sms_direct_routingkey_queue"})public class SmsConsumerController {    @RabbitHandler    public void one(UserEntity userEntity, Channel channel, Message message) {        System.out.println("短信队列接收到消息了!!!"   userEntity.getName());    }}

启动消费者项目,项目启动后会自动消费消息

队列中积压的消息被成功消费

到此SpringBoot整合RabbitMQ实现路由模式代码Demo就结束拉

总结

1、队列与交换机的绑定,不能是任意绑定了,而是指定一个 RoutingKey(路由key) 2、消息的发送方,向Exchange发送消息时,也必须指定消息的RoutingKey 3、Exchange 不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接受到消息

我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。

如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章

●RabbitMQ六种队列模式之发布订阅模式

●RabbitMQ六种队列模式之工作队列模式

●RabbitMQ六种队列模式之简单队列模式

●我们所了解的Redis分布式锁真的就万无一失吗?

●深入理解Redis的持久化机制

●Spring5.0源码深度解析之Spring是如何利用三级缓存解决循环依赖的问题

发现“在看”和“赞”了吗,因为你的点赞,让我元气满满哦

0 人点赞