04-RabbitMQ常用的六种模型以及在SpringBoot中的应用

2019-08-14 14:21:37 浏览数 (1)

在RabbitMQ中,我们常用的模型主要有六种,分别是:

  • Hello World
  • Work queues
  • Publish/Subscribe
  • Routing
  • Topic
  • RPC

俗话说得好,光说不练假把式,下面我们结合springBoot逐一实现这六种模型。

Hello World

从上图可以看出,这是一个默认交换机的单播路由,并且每个队列只有一个消费者。

Work queues

从上图可以看出,主要的部分是:默认交换机的单播路由,并且每个队列有多个消费者。

Publish/Subscribe

从上图可以看出,主要的部分是:扇形交换机的多播路由。

Routing

从上图可以看出,主要的部分是:直连交换机的多播路由。

Topic

从上图可以看出,主要的部分是:主题交换机的多播路由。

RPC

从上图可以看出,主要的部分是:默认交换机的单播路由。

环境

  • 下面我们代码演示一下除了RPC之外的其他五种模型,在SpringBoot中的用法

pom.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.qbz</groupId>
    <artifactId>rabbit-mq-test</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbit-mq-test</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.yml

代码语言:javascript复制
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /qbz-test #虚拟主机,必须存在,不然会报错。
    username: guest
    password: guest
    publisher-confirms: true #消息发送到交换机确认机制,是否确认回调,默认false
    publisher-returns: true #消息发送到交换机确认机制,是否返回回调,默认false
    listener:
      simple:
        prefetch: 1 #预先载入数量,默认值250
        concurrency: 10 #指定最小消费数量
        max-concurrency: 20 #指定最大的消费者数量,当并行消费者数量到达concurrency后会开至最大max-concurrency
        acknowledge-mode: manual # 采用手动应答,设置为手动应答后,消费者如果不进行手动应答,会处于假死状态,不能再消费。默认auto
        retry:
          enabled: true # 失败重试机制,默认为false.
          max-attempts: 1 # 失败后,再重试几次,默认为:3

RabbitMqTestApplication.java

代码语言:javascript复制
package cn.qbz.rabbitmqtest;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * RabbitMQ 测试
 *
 * @author qubianzhong
 * @Date 13:42 2019/7/26
 */
@SpringBootApplication
@EnableRabbit //新增开启Rabbit注解
public class RabbitMqTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqTestApplication.class, args);
    }

}

RabbitMqProduceController.java

代码语言:javascript复制
package cn.qbz.rabbitmqtest.controller;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qubianzhong
 * @date 2019/7/22 13:51
 */
@RestController
public class RabbitMqProduceController {

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private AmqpAdmin amqpAdmin;

    @GetMapping(value = "/produce")
    public String produceMsg(@RequestParam(value = "msg") String msg) {
        /****************************hello/work start****************************
         * hello-world、work queues 均是只绑定队列
         * 发布消息时,如下所示,推荐使用第二种
         */
        //1.如果队列不存在,则消息会进入“黑洞”
        amqpTemplate.convertAndSend("hello", msg   System.currentTimeMillis());
        /**
         * 2.如果队列不存在,则进行创建,如果队列已存在,则使用此队列
         * public Queue(String name) {
         *      //The queue is durable, non-exclusive and non auto-delete.
         * 		this(name, true, false, false);
         *  }
         */
        Queue queue = new Queue("hello");
        amqpAdmin.declareQueue(queue);
        amqpTemplate.convertAndSend(queue.getName(), msg   System.currentTimeMillis());

        //3.work
        queue = new Queue("work");
        amqpAdmin.declareQueue(queue);
        for (int i = 0; i < 30; i  ) {
            amqpTemplate.convertAndSend(queue.getName(), "wwoork"   i);
        }
        /****************************hello/work end****************************/

        /****************************Publish/Subscribe start****************************
         * 生产者只向扇形交换机发送消息,扇形交换机负责向绑定其队列上的所有消费者进行分发。
         * public AbstractExchange(String name) {
         *      //Construct a new durable, non-auto-delete Exchange with the provided name.
         * 		this(name, true, false);
         * }
         */
        for (int i = 0; i < 5; i  ) {
            Exchange pubSubExchange = new FanoutExchange("pub-sub-exchange");
            amqpAdmin.declareExchange(pubSubExchange);
            amqpTemplate.convertAndSend(pubSubExchange.getName(), null, msg   System.currentTimeMillis());
        }
        /****************************Publish/Subscribe end****************************/

        /****************************Routing start****************************
         * Routing  消费者消费的时候,多个路由键绑定一个队列
         */
        Exchange routeExchange = new DirectExchange("routing-exchange");
        amqpAdmin.declareExchange(routeExchange);
        amqpTemplate.convertAndSend(routeExchange.getName(), "routing-log-error", "routing-log-error:"   System.currentTimeMillis());
        amqpTemplate.convertAndSend(routeExchange.getName(), "routing-log-info", "routing-log-info:"   System.currentTimeMillis());
        amqpTemplate.convertAndSend(routeExchange.getName(), "routing-log-waring", "routing-log-waring:"   System.currentTimeMillis());

        /****************************Routing end****************************/

        /****************************Topic start****************************
         * Topic  消费者消费的时候,多个路由键,模糊匹配,绑定一个队列,其实和routing差不多
         */
        Exchange topicExchange = new TopicExchange("topic-exchange");
        amqpAdmin.declareExchange(topicExchange);
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-error.20190706", "topic-log-error.20190706:"   System.currentTimeMillis());
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-info.20190606", "topic-log-info.20190606:"   System.currentTimeMillis());
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-error.20190708", "topic-log-error.20190708:"   System.currentTimeMillis());
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-waring.20190506", "topic-log-waring.20190506:"   System.currentTimeMillis());
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-waring.20190516", "topic-log-waring.20190516:"   System.currentTimeMillis());
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-info.20190723", "topic-log-info.20190723:"   System.currentTimeMillis());

        /****************************Topic end****************************/

        return "success";
    }
}

RabbitTestListener.java

代码语言:javascript复制
package cn.qbz.rabbitmqtest.rabbit;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author qubianzhong
 * @date 2019/7/26 14:40
 */
@Component
public class RabbitTestListener {
    /****************************demo 演示如何消费消息 start****************************
     * 推荐使用第二种或第三种
     * 当不需要绑定交换机的时候,如果使用第三种,exchange置为空会报错
     */

    /**
     * 1.此种用法需要手动创建队列,不然会报错
     */
    @RabbitListener(queues = "demo-1")
    public void demo1(Message message, Channel channel) throws IOException {
        System.err.println("demo-1:"   new String(message.getBody()));
        //信道上发布的消息都会被指派一个唯一的ID号:message.getMessageProperties().getDeliveryTag()
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

        //1:接收到的被指派一个唯一的ID号:deliveryTag;
        // 2:true当前consumer拒绝所有的deliveryTag包括此次的,false当前consumer只拒绝此次的这个deliveryTag;
        // 3:true此消息重新排队,而不是被丢弃或者扔进死信队列中
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
    }

    /**
     * 2.如果队列不存在,则会新建;如果队列已存在,则使用此队列
     */
    @RabbitListener(queuesToDeclare = @Queue("demo-2"))
    public void demo2(Message message, Channel channel) throws IOException {
        System.err.println("demo-2:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }


    /**
     * 3.绑定队列、交换机
     * 如果队列或交换机不存在,则新建;如果已存在,则使用。
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("demo-3"),
            exchange = @Exchange("demo-exchange-3")
    ))
    public void demo3(Message message, Channel channel) throws IOException {
        System.err.println("demo-3:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    /****************************demo 演示如何消费消息 start****************************/

    /****************************hello start****************************
     * 消费 hello-world
     */
    @RabbitListener(queuesToDeclare = {@Queue("hello")})
    public void hello(Message message, Channel channel) throws IOException {
        System.err.println("hello:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
    /****************************hello end****************************/

    /****************************work start****************************
     * 消费 work 一个队列对应多个消费者,此时,消息是平均分配到每个消费者手里的。
     */
    @RabbitListener(queuesToDeclare = {@Queue("work")})
    public void work1(Message message, Channel channel) throws IOException {
        System.err.println("work1:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    @RabbitListener(queuesToDeclare = {@Queue("work")})
    public void work2(Message message, Channel channel) throws InterruptedException, IOException {
        Thread.sleep(1230);
        System.err.println("work2:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    @RabbitListener(queuesToDeclare = {@Queue("work")})
    public void work3(Message message, Channel channel) throws InterruptedException, IOException {
        Thread.sleep(30);
        System.err.println("work3:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
    /****************************work end****************************/


    /****************************Publish/Subscribe start****************************
     * 消费 Publish/Subscribe 扇形交换机负责向绑定其队列上的所有消费者进行分发。
     * 此处,exchange和queue如果不存在,则会新建。
     * 注意:@Exchange注解属性 type需要定义为 fanout,不然会是默认的 direct
     */
    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue("sub-pub-queue1"),
            exchange = @Exchange(value = "pub-sub-exchange", type = ExchangeTypes.FANOUT)
    )})
    public void pubSub1(Message message, Channel channel) throws IOException {
        System.err.println("sub-pub-queue1:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue("sub-pub-queue2"),
            exchange = @Exchange(value = "pub-sub-exchange", type = ExchangeTypes.FANOUT)
    )})
    public void pubSub2(Message message, Channel channel) throws IOException {
        System.err.println("sub-pub-queue2:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
    /****************************Publish/Subscribe end****************************/


    /****************************Routing start****************************
     * 消费 Routing
     * 其中一个队列,只消费error信息,一个队列消费所有信息
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue("routing-log-error"), key = {"routing-log-error"}, exchange = @Exchange(value = "routing-exchange"))})
    public void routing1(Message message, Channel channel) throws IOException {
        System.err.println("routing-log-error:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue("routing-log-all"), key = {"routing-log-error", "routing-log-info", "routing-log-waring"}, exchange = @Exchange(value = "routing-exchange")
    )})
    public void routing2(Message message, Channel channel) throws IOException {
        System.err.println("routing-log-all:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
    /****************************Routing end****************************/


    /****************************Topic start****************************
     * 消费 Topic
     * 其中一个队列,消费前缀为'topic-log-info.'的信息,
     * 一个队列消费前缀为'topic-log-error.'的和模糊匹配'.201907.'的信息
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue("topic-log-1"), key = {"topic-log-info.*"}, exchange = @Exchange(value = "topic-exchange", type = ExchangeTypes.TOPIC))})
    public void topic1(Message message, Channel channel) throws IOException {
        System.err.println("所有的info:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue("topic-log-2"), key = {"*.201907.*", "topic-log-error.*"}, exchange = @Exchange(value = "topic-exchange", type = ExchangeTypes.TOPIC))})
    public void topic2(Message message, Channel channel) throws IOException {
        System.err.println("所有的error and 7月份:"   new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
    /****************************Routing end****************************/

}

RabbitTemplateCallback.java

代码语言:javascript复制
package cn.qbz.rabbitmqtest.rabbit;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author qubianzhong
 * @date 2019/7/31 10:56
 */
@Component
public class RabbitTemplateCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this::confirm);
        this.rabbitTemplate.setReturnCallback(this::returnedMessage);
    }

    /**
     * Confirmation callback.
     *
     * @param correlationData correlation data for the callback.
     * @param ack             true for ack, false for nack
     * @param cause           An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            System.err.println("消息未能在交换机上得到确认!异常处理......");
        }
    }

    /**
     * Returned message callback.
     *
     * @param message    the returned message.
     * @param replyCode  the reply code.
     * @param replyText  the reply text.
     * @param exchange   the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.err.println("消息未能到达队列!"   "return exchange: "   exchange   ", routingKey: "
                  routingKey   ", replyCode: "   replyCode   ", replyText: "   replyText);
    }
}

RPC 补充

我们并不推荐RPC式的mq调用,这么做完全没有发挥mq异步削峰的作用。如果有使用RPC的需求,请移步SpringCloud或者Dubbo。

我们虽然不使用RabbitMQ来进行RPC调用,但是我们也要了解,RabbitMQ为啥子可以实现RPC。

当使用RabbitMQ来实现RPC时.你只是简单地发布消息而已。RabbitMQ会负责使用绑定来路由消息到达合适的队列。RPC服务器会从这些队列上消费消息。RabbitMQ替你完成了所有这些艰难的工作:将消息路由到合适的地方,通过多台RPC服务器对RPC消息进行负载均衡,甚至当处理消息的服务器崩溃时,将RPC消息重发到另一台。

问题在于。如何将应答返回给客户端呢?毕竟,到目前为止你体验的RabbitMQ是发后即忘模型。

RabbitMQ团队想出了一个优雅的解决方案:使用消息来发回应答。在每个AMQP消息头里有个字段叫作reply_ to,消息的生产者可以通过该字段来确定队列名称,并监听队列等待应答。然后接收消息的RPC服务器能够检杳reply _to字段,并创建包含应答内容的新的消息,并以队列名称作为路由键。

你也许想:“光是每次创建唯一队列名就得花很多工夫吧。我们怎样阻止其他客户端读到应答消息呢?”

我们前面说过,如果你声明了没有名字的队列,RabbitMQ会为你指定一个。这个名字恰好是唯一的队列名;同时在声明的时候指定exclusive参数.确保只有你可以读取队列上的消息。所有RPC客户端需要做的是声明临时的、排他的、匿名队列,并将该队列名称包含到RPC消息的reply _to头中。于是服务器端就知道应答消息该发往哪儿了。值得注意的是我们并没有提到将应答队列绑定到交换器上。这是因为当RPC服务器将应答消息发布到RabbitMQ而没有指定交换器时.RabbitMQ就知道目的地是应答队列,路由键就是队列的名称。

0 人点赞