SpringBoot整合RabbitMQ消息队列-学习笔记

2019-09-09 16:45:32 浏览数 (1)

SpringBoot整合RabbitMQ消息队列-学习笔记

前言

本篇文章主要用于记录个人学习RabbitMQ的过程,记录下来方便日后查看,如有错误的地方,还望指正。

本篇文章比较详细地记录本人在学习过程中的每一个步骤,比较适合对RabbitMQ不熟的同学学习,跟着本文操作一遍,就可以大概知道RabbitMQ的基础知识了。

准备阶段

首先把RabbitMQ环境安装好,下面再详细介绍RabbitMQ各个知识点和如何使用。

由于是基于Centos7的操作系统安装RabbitMQ-3.7.7。

为了方便操作,先把防火墙干掉,生产环境当然不能这么干,个人学习随意,以下是相关命令:

centos7关闭并禁止防火墙启动命令:

代码语言:javascript复制
  1. systemctl stop firewalld
  2. systemctl disable firewalld

RabbitMQ安装

这里介绍一种比较简单的安装方法-依赖安装,不用单独安装erlang等依赖。

首先到RabbitMQ官网下载:http://www.rabbitmq.com/download.html,

选择合适你的操作系统版本,本人的操作系统是Centos7.5,所以选择RHEL/CentOS 7.x这个。

把下载好的rabbitmq-server-3.7.7-1.el7.noarch.rpm放到/home目录,由于RabbitMQ-3.7.7需要安装比较新的erlang-v19.3以上,而yum上并没有这么高的版本,所以需要在/etc/yum.repos.d/目录下创建文件rabbitmq-erlang.repo,命令如下:

代码语言:javascript复制
  1. cd /etc/yum.repos.d/
  2. touch rabbitmq-erlang.repo

编辑rabbitmq-erlang.repo命令如下:

代码语言:javascript复制
vi rabbitmq-erlang.repo

添加以下内容到rabbitmq-erlang.repo:

代码语言:javascript复制
  1. [rabbitmq-erlang]
  2. name=rabbitmq-erlang
  3. baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/21/el/7/
  4. gpgcheck=1
  5. gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
  6. repo_gpgcheck=0
  7. enabled=1

上面baseurl是指向erlang-v21版本的链接。

cd到/home目录,执行安装RabbitMQ的命令:

代码语言:javascript复制
yum install -y rabbitmq-server-3.7.7-1.el7.noarch.rpm

这个过程会下载安装依赖的erlang等依赖,等待安装完成,会出现下面的界面,则说明RabbitMQ就已经安装完成了。

代码语言:javascript复制
  1. Running transaction check
  2. Running transaction test
  3. Transaction test succeeded
  4. Running transaction
  5. 正在安装 : erlang-21.0.5-1.el7.centos.x86_64 1/3
  6. 正在安装 : socat-1.7.3.2-2.el7.x86_64 2/3
  7. 正在安装 : rabbitmq-server-3.7.7-1.el7.noarch 3/3
  8. 验证中 : socat-1.7.3.2-2.el7.x86_64 1/3
  9. 验证中 : rabbitmq-server-3.7.7-1.el7.noarch 2/3
  10. 验证中 : erlang-21.0.5-1.el7.centos.x86_64 3/3
  11. 已安装:
  12. rabbitmq-server.noarch 0:3.7.7-1.el7
  13. 作为依赖被安装:
  14. erlang.x86_64 0:21.0.5-1.el7.centos socat.x86_64 0:1.7.3.2-2.el7
  15. 完毕!

RabbitMQ设置

启动RabbitMQ服务:

代码语言:javascript复制
service rabbitmq-server start

刚安装好的RabbitMQ是还没有用户的,也不能访问RabbitMQ的web管理后台,接下来先添加一个叫root的用户:

代码语言:javascript复制
  1. rabbitmqctl add_user root root 
  2. rabbitmqctl set_user_tags root administrator
  3. rabbitmqctl set_permissions -p / root "." "." ".*"
  4. #更多命令查看:rabbitmqctl --help

启用web访问权限:

代码语言:javascript复制
rabbitmq-plugins enable rabbitmq_management

重启RabbitMQ服务:

代码语言:javascript复制
service rabbitmq-server restart

然后在浏览器输入:http://ip:15672/ ,这时可以看到RabbitMQ管理页面了,输入刚刚添加的账号root,密码root即可进入。

登录进去后界面如下:

RabbitMQ是基于Virtual Host来进行权限控制的,现在为我们刚刚添加的root用户添加一个Virtual Host,在RabbitMQ的web管理后台,根据下图进行添加一个virtual host,添加成功后默认分配给root用户了。

RabbitMQ简介

    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种语言平台的客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

通常我们谈到消息队列, 会有三个概念: 消息生产者(Provider)、队列(Queue)、消息消费者(Consumer),RabbitMQ 在这个基本概念上, 多做了一层抽象, 在消息生产者和队列之间, 加入了交换器 (Exchange)。这样消息生产者和队列就没有直接联系, 变成消息生产者把消息发送给交换器, 交换器根据调度策略再把消息发送给队列。

  1. 左侧P代表消息生产者,也就是往RabbitMQ发消息的程序。
  2. 中间即是RabbitMQ,其中包括交换机(Exchange)和队列(Queue)。
  3. 右侧C代表消费者,也就是往RabbitMQ拿消息的程序。

其中比较重要的概念有:虚拟主机(Virtual Host)、交换机(Exchange)、队列(Queue)、绑定(Binding)。

虚拟主机(Virtual Hosts)

        在上面已经说明如何为一个用户创建一个Virtual Host,一个虚拟主机持有一组交换机、队列和绑定。在RabbitMQ当中,用户只能在虚拟主机这个粒度上进行权限的控制。 如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。

交换机(Exchange)

        交换机的功能主要是接收消息并且根据转发策略转发到对应的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误,这个ack模式后面再详细讨论。交换机有四种类型:Direct, topic, Headers and Fanout

队列(Queue)

        队列用于存放消息的载体,一般是和交换机进行绑定,交换机根据转发策略把消息转发到队列里。

绑定(Binding)

        也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

交换机类型介绍

 Direct Exchange:

        direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个binding_key, 消息的routing_key与binding_key匹配时, 才会被交换器投送到绑定的队列中去.

Topic:

        转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

  1. 路由键必须是一串字符,用句号(.)隔开,比如说 topic.message,或者 topic.message.detail 等。
  2. 路由模式必须包含一个星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样:topic.*,那么就只能匹配路由键是:topic.message、topic.other等,第一个单词是 topic,第二个单词可以是任意一个单词。 井号(#)就表示一个或者多个单词,例如一个匹配模式是topic.#,那么可以匹配到例如:topic.message、topic.message.detail等,以topic.开头的路由键都可以匹配到。

 Fanout:

        Fanout类型类似于消息广播,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。

Headers:

        设置header attribute参数类型的交换机

项目简介

    本文是基于Springboot-1.5.15整合RabbitMQ来进行讲解,在真实工作中,生产者和消费者一般是在不同的项目里,各自负责不同的职责,这里为了模拟真实环境,创建两个不同的项目进行演示。创建两个maven项目,消息生产者mq-rabbit-provider和消息消费者mq-rabbit-consumer,两个项目的pom.xml文件添加相同依赖:

代码语言:javascript复制
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>

mq-rabbit-provider项目的application.properties内容如下:

代码语言:javascript复制
  1. server.port=8080
  2. spring.application.name=springboot-rabbitmq-provider
  3. spring.rabbitmq.host=10.211.55.3
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=root
  6. spring.rabbitmq.password=root
  7. #RabbitMQ的虚拟host
  8. spring.rabbitmq.virtual-host=CalonHost

mq-rabbit-consumer项目的application.properties内容如下:

代码语言:javascript复制
  1. server.port=9090
  2. spring.application.name=springboot-rabbitmq-consumer
  3. spring.rabbitmq.host=10.211.55.3
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=root
  6. spring.rabbitmq.password=root
  7. #RabbitMQ的虚拟host
  8. spring.rabbitmq.virtual-host=CalonHost

这里只是端口和应用名不同,其他都一样。

接下来分别介绍Direct、Topic、Fanout等3种不同交换机的使用例子。

Direct Exchange

    在mq-rabbit-provider项目建一个配置类DirectRabbitConfig.java,配置交换机、队列、BindingKey=CalonDirectRouting的绑定关系,代码如下:

代码语言:javascript复制
  1. @Configuration
  2. public class DirectRabbitConfig {
  3. //队列
  4. @Bean
  5. public Queue CalonDirectQueue() {
  6. return new Queue("CalonDirectQueue",true);
  7. }
  8. //Direct交换机
  9. @Bean
  10. DirectExchange CalonDirectExchange() {
  11. return new DirectExchange("CalonDirectExchange");
  12. }
  13. //绑定
  14. @Bean
  15. Binding bindingDirect() {
  16. return BindingBuilder.bind(CalonDirectQueue()).to(CalonDirectExchange()).with("CalonDirectRouting");
  17. }
  18. }

    创建一个实体类User.java,这里说明一下,该实体类是消息的主体,所以必须实现Serializable接口,否则在消息消费者项目读取消息时会报错,代码如下:

代码语言:javascript复制
  1. package mq.rabbit.entity;
  2. import java.io.Serializable;
  3. public class User implements Serializable{
  4. private static final long serialVersionUID = 1L;
  5. private String id;
  6. private String username;
  7. private String password;
  8. private String type;
  9. public String getId() {
  10. return id;
  11. }
  12. public void setId(String id) {
  13. this.id = id;
  14. }
  15. public String getUsername() {
  16. return username;
  17. }
  18. public void setUsername(String username) {
  19. this.username = username;
  20. }
  21. public String getPassword() {
  22. return password;
  23. }
  24. public void setPassword(String password) {
  25. this.password = password;
  26. }
  27. public String getType() {
  28. return type;
  29. }
  30. public void setType(String type) {
  31. this.type = type;
  32. }
  33. public User() {
  34. super();
  35. }
  36. public User(String id, String username, String password, String type) {
  37. super();
  38. this.id = id;
  39. this.username = username;
  40. this.password = password;
  41. this.type = type;
  42. }
  43. }

下面创建一个Controller,利用http请求进行调试,CalonDirectExchange是上面配置的交换机标识,CalonDirectRouting就是上面绑定好的queue名字,由于上面已经配置好交换机和队列的绑定关系,这两个组合就可以知道消息最终是发送到队列CalonDirectQueue里面去了,Controller类的代码如下:

代码语言:javascript复制
  1. @Controller
  2. public class SendController {
  3. @Autowired
  4. private RabbitTemplate template;
  5. @GetMapping("/sendDirect")
  6. private @ResponseBody String sendDirect(String message) throws Exception {
  7. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendDirect");
  8. template.convertAndSend("CalonDirectExchange", "CalonDirectRouting", user);
  9. return "OK,sendDirect:" message;
  10. }
  11. }

启动mq-rabbit-provider项目,在浏览器输入:

代码语言:javascript复制
http://localhost:8080/sendDirect?message=123

再去RabbitMQ的web管理后台查看,你会发现在Queue里找到刚刚添加的那个队列,后面的数字就是消息数量有变化,说明消息已经存储进去了:

    把mq-rabbit-provider项目里的User类和DirectRabbitConfig类复制到mq-rabbit-consumer项目,User类用于读取消息时接收消息对象,DirectRabbitConfig可以不复制,但是如果RabbitMQ里还没有被监听的队列时会报错,复制过来是为了让RabbitMQ里还没有被监听的队列时自动创建该队列,防止报错。

创建队列监听类DirectReceiver.java,代码如下:

代码语言:javascript复制
  1. package mq.rabbit.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import mq.rabbit.entity.User;
  6. @Component
  7. @RabbitListener(queues = "CalonDirectQueue")//CalonDirectQueue为队列名称
  8. public class DirectReceiver {
  9. @RabbitHandler
  10. public void process(User user) {
  11. System.out.println("DirectReceiver消费者收到消息 : " user.getId() "," user.getUsername() "," user.getPassword() "," user.getType());
  12. }
  13. }

启动mq-rabbit-consumer项目,就会收到之前发送到CalonDirectQueue队列的消息了,继续调用上面的请求/sendDirect,消息消费者会继续收到消息。

Topic Exchange

在mq-rabbit-provider项目建一个配置类TopicRabbitConfig.java,配置交换机、队列、BindingKey的绑定关系,代码如下:

代码语言:javascript复制
  1. package mq.rabbit.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * Topic Exchange类型交换机
  10. * @author calon
  11. *
  12. */
  13. @Configuration
  14. public class TopicRabbitConfig {
  15. public final static String first = "topic.first";
  16. public final static String second = "topic.second";
  17. @Bean
  18. public Queue firstQueue() {
  19. return new Queue(TopicRabbitConfig.first);
  20. }
  21. @Bean
  22. public Queue secondQueue() {
  23. return new Queue(TopicRabbitConfig.second);
  24. }
  25. @Bean
  26. TopicExchange exchange() {
  27. return new TopicExchange("topicExchange");
  28. }
  29. //绑定topic.first队列到routingKey为topic.first,只有topic.first的routingKey消息才发送到此队列
  30. @Bean
  31. Binding bindingExchangeMessage() {
  32. return BindingBuilder.bind(firstQueue()).to(exchange()).with(first);
  33. }
  34. //绑定topic.second队列到topic.#,凡是topic.开头的routingKey消息都发送到此队列
  35. @Bean
  36. Binding bindingExchangeMessage2() {
  37. return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
  38. }
  39. }

Topic Exchange类型的交换机是基于模糊匹配规则,所以这里创建两个Queue,分别绑定到两个BindingKey:topic.first和topic.#,用来测试消息进到哪个队列里。

在SendController类里添加两个request,代码如下:

代码语言:javascript复制
  1. @Controller
  2. public class SendController {
  3. @Autowired
  4. private RabbitTemplate template;
  5. @GetMapping("/sendTopicFirst")
  6. private @ResponseBody String sendTopicFirst(String message) {
  7. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendTopicFirst");
  8. template.convertAndSend("topicExchange", "topic.first", user);
  9. return "OK,sendTopicFirst:" message;
  10. }
  11. @GetMapping("/sendTopicSecond")
  12. private @ResponseBody String sendTopicSecond(String message) {
  13. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendTopicSecond");
  14. template.convertAndSend("topicExchange", "topic.second", user);
  15. return "OK,sendTopicSecond:" message;
  16. }
  17. }

当我们调用/sendTopicFirst请求时,交换机为topicExchange,routingKey为topic.first,按照上面bindingKey的配置,可以匹配到topic.first和topic.#规则,对应的队列是topic.first和topic.second,所以一条消息进到两个队列里。

当调用/sendTopicSecond请求时,交换机为topicExchange,routingKey为topic.second,匹配到topic.#规则,对应的队列是topic.second,所以消息进到topic.second队列里,除了#匹配规则,大家可以自行试试星号(*)这个匹配规则,*符号是匹配一个单词的。

把mq-rabbit-provider项目里的TopicRabbitConfig类复制到mq-rabbit-consumer项目,分别创建TopicFirstReceiver和TopicSecondReceiver消息监听类,代码如下:

代码语言:javascript复制
  1. package mq.rabbit.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import mq.rabbit.entity.User;
  6. @Component
  7. @RabbitListener(queues = "topic.first")
  8. public class TopicFirstReceiver {
  9. @RabbitHandler
  10. public void process(User user) {
  11. System.out.println("TopicFirstReceiver消费者收到消息 : " user.getId() "," user.getUsername() "," user.getPassword() "," user.getType());
  12. }
  13. }
代码语言:javascript复制
  1. package mq.rabbit.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import mq.rabbit.entity.User;
  6. @Component
  7. @RabbitListener(queues = "topic.second")
  8. public class TopicSecondReceiver {
  9. @RabbitHandler
  10. public void process(User user) {
  11. System.out.println("TopicSecondReceiver消费者收到消息 : " user.getId() "," user.getUsername() "," user.getPassword() "," user.getType());
  12. }
  13. }

启动mq-rabbit-consumer项目,会发现分别接收到各自监听的队列的消息。

Fanout Exchang

    在mq-rabbit-provider项目建一个配置类FanoutRabbitConfig.java,配置交换机、队列的绑定关系,代码如下:    

代码语言:javascript复制
  1. package mq.rabbit.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class FanoutRabbitConfig {
  10. @Bean
  11. public Queue AMessage() {
  12. return new Queue("fanout.A");
  13. }
  14. @Bean
  15. public Queue BMessage() {
  16. return new Queue("fanout.B");
  17. }
  18. @Bean
  19. public Queue CMessage() {
  20. return new Queue("fanout.C");
  21. }
  22. @Bean
  23. FanoutExchange fanoutExchange() {
  24. return new FanoutExchange("fanoutExchange");
  25. }
  26. @Bean
  27. Binding bindingExchangeA() {
  28. return BindingBuilder.bind(AMessage()).to(fanoutExchange());
  29. }
  30. @Bean
  31. Binding bindingExchangeB() {
  32. return BindingBuilder.bind(BMessage()).to(fanoutExchange());
  33. }
  34. @Bean
  35. Binding bindingExchangeC() {
  36. return BindingBuilder.bind(CMessage()).to(fanoutExchange());
  37. }
  38. }

这里创建三个队列fanout.A、fanout.B、fanout.C,都绑定到FanoutExchange交换机fanoutExchange上。

在SendController类添加一个请求/sendFanout,代码如下:

代码语言:javascript复制
  1. package mq.rabbit.controller;
  2. import java.util.UUID;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Controller;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.PathVariable;
  8. import org.springframework.web.bind.annotation.ResponseBody;
  9. import com.fasterxml.jackson.databind.ObjectMapper;
  10. import mq.rabbit.entity.User;
  11. @Controller
  12. public class SendController {
  13. @Autowired
  14. private RabbitTemplate template;
  15. @GetMapping("/sendFanout")
  16. private @ResponseBody String sendFanout(String message) {
  17. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendFanout");
  18. template.convertAndSend("fanoutExchange", null, user);
  19. return "OK,sendFanout:" message;
  20. }
  21. }

当调用/sendFanout请求时,在RabbitMQ的web管理界面看到三个队列fanout.A、fanout.B、fanout.C都有一条消息,在Fanout交换机里,如果有设置BindingKey,Fanout交换机会忽略已设置的BindingKey,把消息发送到绑定该交换机的所有队列里。

把mq-rabbit-provider项目里的FanoutRabbitConfig类复制到mq-rabbit-consumer项目,分别创建FanoutReceiverA、FanoutReceiverB和FanoutReceiverC类,代码如下:

代码语言:javascript复制
  1. package mq.rabbit.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import mq.rabbit.entity.User;
  6. @Component
  7. @RabbitListener(queues = "fanout.A")
  8. public class FanoutReceiverA {
  9. @RabbitHandler
  10. public void process(User user) {
  11. System.out.println("FanoutReceiverA消费者收到消息 : " user.getId() "," user.getUsername() "," user.getPassword() "," user.getType());
  12. }
  13. }
代码语言:javascript复制
  1. package mq.rabbit.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import mq.rabbit.entity.User;
  6. @Component
  7. @RabbitListener(queues = "fanout.B")
  8. public class FanoutReceiverB {
  9. @RabbitHandler
  10. public void process(User user) {
  11. System.out.println("FanoutReceiverB消费者收到消息 : " user.getId() "," user.getUsername() "," user.getPassword() "," user.getType());
  12. }
  13. }
代码语言:javascript复制
  1. package mq.rabbit.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import mq.rabbit.entity.User;
  6. @Component
  7. @RabbitListener(queues = "fanout.C")
  8. public class FanoutReceiverC {
  9. @RabbitHandler
  10. public void process(User user) {
  11. System.out.println("FanoutReceiverC消费者收到消息 : " user.getId() "," user.getUsername() "," user.getPassword() "," user.getType());
  12. }
  13. }

上面也可以在一个类里写3个方法来进行对队列的监听,不同的地方在于把@RabbitListener移到方法上即可。

启动mq-rabbit-consumer,即可收到队列的消息。

RabbitMQ消息的确认机制

    在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的生产者在将消息发送出去之后,消息到底有没有正确到达服务器?如果不进行特殊配置的话,默认情况下发布消息是不会返回任何信息给生产者的,也就是生产者是不知道消息有没有正确到达消息服务器,同理,消息消费者在接收消息后,如果在执行业务逻辑过程出现异常崩溃等情况,会导致消息丢失,所以我们需要对消息的发送和消费进行确认,确保消息能够被正确的存储和消费。RabbitMQ为我们提供了两种方式:1、事务机制;2、确认机制。下面介绍消息确认机制。

生产者消息确认机制:

先把例子跑起来,下面再做详细介绍。在mq-rabbit-provider项目的application.properties文件添加以下属性:

代码语言:javascript复制
  1. #确认消息已发送到交换机(Exchange)
  2. spring.rabbitmq.publisher-confirms=true
  3. #确认消息已发送到队列(Queue)
  4. spring.rabbitmq.publisher-returns=true

在mq-rabbit-provider项目创建配置类RabbitConfig.java,代码如下:

代码语言:javascript复制
  1. package mq.rabbit.config;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.amqp.rabbit.support.CorrelationData;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class RabbitConfig {
  10. @Bean
  11. public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
  12. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  13. rabbitTemplate.setConnectionFactory(connectionFactory);
  14. rabbitTemplate.setMandatory(true);//必须设置为true,才能让下面的ReturnCallback函数生效
  15. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  16. @Override
  17. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  18. System.out.println("=======ConfirmCallback=========");
  19. System.out.println("correlationData = " correlationData);
  20. System.out.println("ack = " ack);
  21. System.out.println("cause = " cause);
  22. System.out.println("=======ConfirmCallback=========");
  23. }
  24. });
  25. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  26. @Override
  27. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  28. System.out.println("--------------ReturnCallback----------------");
  29. System.out.println("message = " message);
  30. System.out.println("replyCode = " replyCode);
  31. System.out.println("replyText = " replyText);
  32. System.out.println("exchange = " exchange);
  33. System.out.println("routingKey = " routingKey);
  34. System.out.println("--------------ReturnCallback----------------");
  35. }
  36. });
  37. return rabbitTemplate;
  38. }
  39. }

RabbitMQ生产者是依赖两个回调函数来实现确认的,分别是ConfirmCallback和ConfirmCallback,如上面的代码。按以下4种情况进行回调:

1、消息找不到交换机(Exchange)时回调ConfirmCallback,返回ack=false,代码如下:

代码语言:javascript复制
  1. =======ConfirmCallback=========
  2. correlationData = null
  3. ack = false
  4. cause = channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)
  5. =======ConfirmCallback=========
  6. 2018-08-30 09:59:37.892 ERROR 55704 --- [0.211.55.3:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)

2、消息找到交换机(Exchange)但找不到队列(Queue)时回调ConfirmCallback和ReturnCallback,返回ack=true,replyCode = 312,replyText = NO_ROUTE,代码如下:

代码语言:javascript复制
  1. --------------ReturnCallback----------------
  2. message = (Body:'[B@bf8af5b(byte[179])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/x-java-serialized-object, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=null, receivedExchange=null, receivedRoutingKey=null, receivedDelay=null, deliveryTag=0, messageCount=null, consumerTag=null, consumerQueue=null])
  3. replyCode = 312
  4. replyText = NO_ROUTE
  5. exchange = CalonDirectExchange
  6. routingKey = CalonDirectRouting1
  7. --------------ReturnCallback----------------
  8. =======ConfirmCallback=========
  9. correlationData = null
  10. ack = true
  11. cause = null
  12. =======ConfirmCallback=========

3、消息既找不到交换机(Exchange)又找不到队列(Queue)时回调ConfirmCallback,返回ack=false,代码如下:

代码语言:javascript复制
  1. =======ConfirmCallback=========
  2. correlationData = null
  3. ack = false
  4. cause = channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)
  5. =======ConfirmCallback=========
  6. 2018-08-30 10:03:22.204 ERROR 55704 --- [0.211.55.3:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)

4、消息成功发送回调ConfirmCallback,返回ack=true,代码如下:

代码语言:javascript复制
  1. =======ConfirmCallback=========
  2. correlationData = null
  3. ack = true
  4. cause = null
  5. =======ConfirmCallback=========

根据上面4种状态,我们可以在这两个回调函数里根据返回的状态进行业务方面的处理,比如业务回滚或者重新发送消息等,可以基于上面SendController类对其中一个请求进行测试,更改exchange和routingKey来测试一下这4种状态,这个就是生产消息的确认机制。

消费者消息确认机制:

    在mq-rabbit-consumer项目的DirectRabbitConfig配置类进行消息消费确认机制的配置,代码如下:

代码语言:javascript复制
  1. package mq.rabbit.config;
  2. import org.springframework.amqp.core.AcknowledgeMode;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.DirectExchange;
  6. import org.springframework.amqp.core.Queue;
  7. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  8. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. import mq.rabbit.receiver.DirectAckReceiver;
  13. @Configuration
  14. public class DirectRabbitConfig {
  15. @Bean
  16. public Queue CalonDirectQueue() {
  17. return new Queue("CalonDirectQueue",true);
  18. }
  19. @Bean
  20. DirectExchange CalonDirectExchange() {
  21. return new DirectExchange("CalonDirectExchange");
  22. }
  23. @Bean
  24. Binding bindingDirect() {
  25. return BindingBuilder.bind(CalonDirectQueue()).to(CalonDirectExchange()).with("CalonDirectRouting");
  26. }
  27. @Autowired
  28. private CachingConnectionFactory connectionFactory;
  29. @Autowired
  30. private DirectAckReceiver directAckReceiver;//消息接收处理类
  31. @Bean
  32. public SimpleMessageListenerContainer simpleMessageListenerContainer() {
  33. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  34. container.setConcurrentConsumers(1);
  35. container.setMaxConcurrentConsumers(1);
  36. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
  37. container.setQueues(CalonDirectQueue());
  38. container.setMessageListener(directAckReceiver);
  39. return container;
  40. }
  41. }

在mq-rabbit-consumer项目新建消息监听类DirectAckReceiver.java,用于处理消息的确认操作,代码如下:

代码语言:javascript复制
  1. package mq.rabbit.receiver;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import com.fasterxml.jackson.databind.ObjectMapper;
  7. import com.rabbitmq.client.Channel;
  8. import mq.rabbit.entity.User;
  9. @Component
  10. public class DirectAckReceiver implements ChannelAwareMessageListener {
  11. @Autowired
  12. private ObjectMapper objectMapper;
  13. @Override
  14. public void onMessage(Message message, Channel channel) throws Exception {
  15. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  16. try {
  17. byte[] body = message.getBody();
  18. User user = objectMapper.readValue(body, User.class);
  19. System.out.println("DirectAckReceiver消费者收到消息 : " user.getId() "," user.getUsername() "," user.getPassword() "," user.getType());
  20. channel.basicAck(deliveryTag, true);
  21. // channel.basicReject(deliveryTag, true);//为true会重新放回队列
  22. } catch (Exception e) {
  23. channel.basicReject(deliveryTag, false);
  24. e.printStackTrace();
  25. }
  26. }
  27. }

消息接收确认模式有以下3种:

  • AcknowledgeMode.NONE:不确认
  • AcknowledgeMode.AUTO:自动确认
  • AcknowledgeMode.MANUAL:手动确认

默认情况下是自动确认,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息,在实际应用中,我们希望每条消息都能够被正确消费而不是出现丢失的情况,上面代码是开启手动确认模式,下面看看手动确认都有哪几种方式:

  • 成功确认:void basicAck(long deliveryTag, boolean multiple) throws IOException;

            deliveryTag:该消息的index

            multiple:是否批量. true:将一次性ack所有小于deliveryTag的消息。

        消费者成功处理后,调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。

  • 失败确认:void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

            deliveryTag:该消息的index。

            multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。

            requeue:是否重新入队列。

  • 拒绝确认:void basicReject(long deliveryTag, boolean requeue) throws IOException;

            deliveryTag:该消息的index。

            requeue:被拒绝的是否重新入队列。

            channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。

这里要注意一点的是,无论如何,必须对消息进行确认操作,如果不调用相关函数进行确认,则RabbitMQ会认为该程序处理能力弱,不会再发送消息到该监听程序。

还有一个问题,在启用消息手动确认模式后,发送消息的实体需要转成json字符串发送,接收消息时再把json转回对象,否则出错,也许是我还没找到直接发送实体的方法,还望指正。        

RabbitMQ的基础知识就已经介绍完了,如有错误,还望留意指正,谢谢。

0 人点赞