RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)
消息中间件
首先来聊一聊什么是消息中间件,以及消息中间件能帮助我们解决什么问题。 消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统,简单来说,消息中间件能够为我们的系统提供异步处理能力。 举个例子,某个系统在使用之前需要用户进行注册,用户在完成注册后需要发送短信和邮件提醒用户注册成功,如果按照同步的流程走,它应该是这样的:
但这样有一个问题,当用户注册成功后,我们需要等待系统发送完短信和邮件后再让用户执行下一个流程吗?其实是不需要的,用户注册成功后完全可以让其直接进入下一个流程,而发送短信和发送邮件的操作可以进行异步处理:
将这两个操作分为两个异步任务去执行,虽然提高了整体的执行效率,但用户仍然要等待两个操作中耗时最长的那个操作结束,因此,我们可以引入消息中间件:
此时用户只需要等待系统将消息放入消息中间件,至于发送短信和邮件,将由消息中间件自动完成,这样将大大提高注册的效率,提升用户体验。
消息中间件的功能远不止如此,它还能够实现应用解耦,举个例子,某电商系统在用户提交了一份订单后,订单服务会调用库存服务提供的接口进行减库存的操作,如下所示:
那么这两个服务就存在着耦合性,当库存服务的接口发生了变化,则订单服务就必须立马修改代码,否则程序都不能正常运行了,若想解决这个问题,就可以在服务中间加一个消息中间件,订单服务只需将消息写入中间件,并由库存服务去订阅这个消息即可,如下所示:
消息中间件还能够用来实现流量控制,比如非常熟悉的秒杀业务,当秒杀活动开始时的流量是非常巨大的,为了防止大量的请求直接压垮系统,可以将它们按顺序写入消息队列,再由系统慢慢地去获取请求进行处理:
一些概念
相信通过消息中间件的一些使用场景,大家已经能够对它有一个自己的认识,下面介绍一些消息中间件的概念,它们对后续的学习大有裨益。
首先消息中间件中有两个非常重要的概念:
- 消息代理
- 目的地
消息代理指的是安装了消息中间件的服务器,消息发送者会将消息先发送给消息代理,再由消息代理将消息传递到指定的目的地,至于消息的目的地,它又分为两种:
- 队列(Queue):队列指的是点对点(P2P)模式,在该模式下,一条消息只能被一个消费者消费
- 主题(Topic):主题指的是发布(Pub)/订阅(Sub)模式,在该模式下,一条消息有可能会被多个消费者消费
本篇文章将以RabbitMQ作为消息中间件的实现产品,RabbitMQ中也有几个概念需要介绍一下:
- 消息(Message):消息由消息头和消息体组成
- 消息生产者(Publisher):消息的生产者,向RabbitMQ发布消息的客户端
- 消息消费者(Consumer):消息的消费者,从RabbitMQ消费消息的客户端
- 交换器(Exchange):交换器用来接收消息生产者发送的消息,并将这些消息路由到RabbitMQ中的消息队列
- 消息队列(Queue):用于保存消息生产者生产的消息
- 绑定关系(Binding):用于交换器和消息队列之间的绑定
- 信道(Channel):多路复用连接中的一条独立的双向数据流通道
RabbitMQ的执行流程如下图:
消息生产者首先与消息代理建立一个长连接,因为需要发送消息的生产者有很多,所以通过信道传输消息,学过计网中信道复用技术的同学应该能够理解为什么这样做;消息中包含消息头和消息体,消息头中包含一个非常重要的属性,路由键(route-key);消息会先交由消息代理进行保管,消息代理将消息传给交换器,交换器类似于网络中的交换机,交换机通过连接外网,能够将网络中的数据分发至连接到该交换机中的任何节点,RabbitMQ中的交换器也是如此,它会将消息分发给某个消息队列, 交换器与消息队列之间也有着绑定关系,当路由键与某个绑定关系匹配时,交换器将会把消息分发给指定的消息队列。 而消息消费者也会与消息代理建立一个长连接,并通过信道复用,监听着某个消息队列,当该消息队列产 生了消息时,该消费者能够感知到并对其进行消费。
安装RabbitMQ
接下来就安装一下RabbitMQ,首先下载镜像:
代码语言:javascript复制docker pull rabbitmq:management
下载完成后运行起来:
代码语言:javascript复制docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
现在我们就可以通过 15672
端口访问RabbitMQ的后台了:
用户名和密码均为 guest
。
Exchange
重点介绍一下交换器,在RabbitMQ的执行流程中可以发现消息要想进入消息队列必须经过交换器,那么交换器将消息传递给消息队列就分为几种情况,对应着交换器的几种类型:
- direct
- fanout
- topic
- headers
其中direct指的是直接交换器,它会根据消息中的路由键进行精确匹配,将消息交给匹配到的某个消息队列;fanout是扇出交换器,它不会处理路由键,而是将消息交给与该交换器绑定的所有消息队列;topic是主题交换器,它会根据路由键进行模式匹配,并将消息交给匹配到的某些消息队列,也就是说,交换器和消息队列的绑定关系可以是一个模式,比如:test#,此时路由键为test、test1、test12等的消息都会由交换器交给该消息队列;headers是首部交换器,它也不会处理路由键,而是通过消息中的消息头与接收消息时的请求头进行匹配。
当RabbitMQ启动时,会为我们自动创建7个交换器:
若是想创建其它的交换器,可以点击下方的 Add a new exchange
:
其中交换器是否自动删除若是设置为Yes,则当该交换器未绑定任何消息队列时将会被自动删除;而交换器是否为内部交换器若是设置为Yes,则客户端将无法将消息发送给该交换器,相当于对外部是不可见的。 交换器创建好后,必须与消息队列进行绑定才能正常工作,所以还需要创建消息队列:
若是消息队列设置了自动删除为Yes,则当没有任何客户端监听该消息队列时会被自动删除。
交换器和消息队列都准备就绪,接下来就需要为二者建立绑定关系,点击进入新创建的交换器:
设置绑定关系:
此时在指定的交换器下可以发送消息:
直接交换器会根据路由键进行精确匹配,当消息中的路由键和交换器与消息队列绑定关系中的路由键完全一致时,该消息便会进入这个消息队列:
消息队列已经存在了一个消息正在等待消费,进入该消息队列即可看到消息的详情:
其它的交换器操作也是如此。
SpringBoot整合RabbitMQ
接下来如何将RabbitMQ整合到SpringBoot应用中才是我们关注的重点,首先创建一个SpringBoot应用,并引入依赖:
代码语言:javascript复制<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置一下RabbitMQ:
代码语言:javascript复制spring:
rabbitmq:
host: 192.168.66.10 # 主机地址
port: 5672 # 端口号
virtual-host: / # 虚拟主机,默认为 /
最后在启动类上添加@EnableRabbit注解即可。
那么如何使用代码实现交换器、消息队列的创建,消息的发送等等操作呢?
RabbitMQ提供了一个 AmqpAdmin
类来对交换器、消息队列等进行增删改查操作,代码如下:
@SpringBootTest
@EnableRabbit
class RabbitMqDemoApplicationTests {
@Autowired
private AmqpAdmin amqpAdmin;
@Test
void contextLoads() {
// 创建直接交换器
Exchange directExchange = new DirectExchange("direct-exchange");
amqpAdmin.declareExchange(directExchange);
// 创建队列
Queue queue = new Queue("test-queue");
amqpAdmin.declareQueue(queue);
// 建立绑定关系
Binding binding = new Binding("test-queue",
Binding.DestinationType.QUEUE,
"direct-exchange",
"test-key",
null);
amqpAdmin.declareBinding(binding);
}
}
创建交换器有多个重载的构造方法,全参的构造方法如下:
代码语言:javascript复制public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
super(name, durable, autoDelete, arguments);
}
方法中参数的含义分别为:
- name:交换器名
- durable:是否持久化,默认为true;若为false,则不持久化,RabbitMQ重启后将被删除
- autoDelete:是否自动删除,默认为false;若为true,则当没有消息队列与其绑定时将被自动删除
- arguments:指定参数
创建消息队列也有多个重载的方法,其全参的构造方法如下:
代码语言:javascript复制public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) {
super(arguments);
Assert.notNull(name, "'name' cannot be null");
this.name = name;
this.actualName = StringUtils.hasText(name) ? name : Base64UrlNamingStrategy.DEFAULT.generateName() "_awaiting_declaration";
this.durable = durable;
this.exclusive = exclusive;
this.autoDelete = autoDelete;
}
参数含义为:
- name:消息队列名
- durable:是否持久化,默认为true
- exclusive:是否排他,默认为false;若为true,则当有客户端连接上该消息队列时,其它客户端将无法连接到该队列
- autoDelete:是否自动删除,默认为false
- arguments:指定参数
创建绑定关系只有一个构造方法:
代码语言:javascript复制public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
@Nullable Map<String, Object> arguments) {
super(arguments);
this.destination = destination;
this.destinationType = destinationType;
this.exchange = exchange;
this.routingKey = routingKey;
}
参数含义为:
- destination:目的地,交换器名或消息队列名
- destinationType:目的地类型,可以选择交换器和消息队列
- exchange:需要绑定的交换器名
- routingKey:路由键
- arguments:指定参数
到这里,交换器和消息队列就创建好了,绑定关系也建立了,查看后台可以进行验证:
接下来尝试发送一条消息:
代码语言:javascript复制@SpringBootTest
@EnableRabbit
class RabbitMqDemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void sendMessage(){
// 发送消息
rabbitTemplate.convertAndSend("direct-exchange","test-key","hello rabbitmq!");
}
}
当消息发送成功后,消息便会进入队列等待消费,那么就来看看如何消费队列中的消息:
代码语言:javascript复制@Service
public class MessageServiceImpl implements MessageService {
@RabbitListener(queues = {"test-queue"})
@Override
public void receiveMessage(Message message) {
// 获取消息体
byte[] body = message.getBody();
// 获取消息头的属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println(new String(body));
System.out.println(messageProperties);
}
}
在某个业务方法上添加@RabbitListener注解,值为需要监听的消息队列名,然后启动项目,此时该方法将监听test-queue队列,当队列中产生了消息时,便会立马对消息进行消费,并将消息封装到方法的入参中,输出结果为:
代码语言:javascript复制hello rabbitmq!
MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct-exchange, receivedRoutingKey=test-key, deliveryTag=1, consumerTag=amq.ctag-2Y8tYT5SDk0tartAFys6tg, consumerQueue=test-queue]
然而当你发送的是一个对象的JSON数据时,在获取到消息内容后,还需要使用JSON工具将其转为对象,为此,Spring帮我们简化了这一过程,只需将对象的类型作为参数传入方法中,即可直接得到该对象:
代码语言:javascript复制@RabbitListener(queues = {"test-queue"})
@Override
public void receiveMessage(Message message,Person peson) {
System.out.println(peson);
}