RabbitMq是我们在开发过程中经常会使用的一种消息队列。今天我们来研究研究rabbitMq的使用。
rabbitMq的官网: rabbitmq.com/
rabbitMq的安装这里先略过,因为我尝试了几次都失败了,后面等我安装成功了会把详细的文章发出来。目前是使用公司的环境进行的调试。
1. 一些概念
RabbitMQ是一个开源的消息代理和队列服务器,用来实现各个应用服务间的数据共享(跨平台 ,跨语言)。RabbitMQ是使用erlang语言编写的,并且基于AMQP协议实现。
所有的消息队列产品模型抽象上来说,都是类似的过程。生产者创建消息,然后发布到消息队列中,由消费者进行消费。
而rabbitMQ也是类似的,有生产者,消费者角色。其内部结构如下图所示。
那么接下来我们就来介绍一下RabbitMQ中的这些概念。
1. Message:
消息,就是我们需要传递和共享的信息,消息由一些列的可选属性组成,包括路由键,优先级,是否持久化等信息
2. Publisher
消息的生产者,也是一个向交换机发布消息的客户端应用程序。
3. Exchange:
交换机,这是RabbitMQ中的一个非常重要的概念,在rabbitMq中,生产者产生的消息都不是直接发送到队列中去的,而是发送到了交换机中,交换机会通过一定的规则绑定队列,交换机会根据相应的路由规则发送给对服务器中的队列。
4. Binding:
绑定, 用于交换机和消息列队之间的关联。一个绑定就是基于路由键(routing-key)将交换机和消息队列连接起来的路由规则。所以可以将交换机理解成一个有绑定有成的路由表。
5. Queue:
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可以投入一个或多个队列中。消息一直在对队列里边,等待消费者连接到这个队列将其消费。
6. Connection:
网络连接,比如一个TCP连接。
7. Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是简历在真实的TCP连接内的虚拟连接。AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成的。因为对于操作系统过来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
8. Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用。
9. Virtual Host
虚拟主机,标识一批交换机、消息队列和相关对象。 虚拟主机是相同的身份认证和加密环境的独立服务器域。 每个vhost本质就是一个mini版的rabbitMQ服务器,拥有自己的队列,交换机,绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ的默认vhost是/.
10. Broker
标识消息队列服务器实体。
2. Exchange类型
Exchange分发消息的时候根据类型的不同分发策略有所区别,目前常见的有四种类型: direct、fanout、topic、headers。 headers匹配AMQP消息的header而不是路由键,此外headers交换机和direct交换机完成一直但是性能差很多,几乎用不到了,所以直接看另外三种类型。
2.1 direct交换机
消息中的路由键(routing key)如果和Binding中的bing key一致,交换机就将消息发送到队列的队列中。路由键要完全匹配,单个传播。
2.2 fanout
每个发到fanout类型交换机的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键,只是简单的将队列绑定到交换机上,每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。
2.3 topic
topic交换机通过模式匹配分配路由的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用.隔开。它同样会识别两个通配符: # 和* 。 #匹配0个或多个单词, * 匹配一个单词
3. springBoot集成RabbitMQ
SpringBoot集成rabbitMQ还是比较简单的,因为springBoot使用RabbitTemplate对常用操作进行了封装。
接下来我们来看一下集成过程。首先导入依赖。
代码语言:javascript复制<!-- 无需在parent的配置文件中添加 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后在springBoot的配置文件 application.yml中配置rabbitMQ连接信息
代码语言:javascript复制server:
port: 7890
spring:
rabbitmq:
host: 172.15.33.52
port: 5672
username: root
password: 123456
接下来我们我们分三种交换机进行演示。
3.1 direct
首先是配置类,在配置类中我们需要声明交换机,队列和绑定关系。
代码语言:javascript复制@Configuration
public class DirectExchangeConfig {
public static final String DIRECT_QUEUE = "directQueue";
public static final String DIRECT_QUEUE2 = "directQueue2";
public static final String DIRECT_EXCHANGE = "directExchange";
public static final String DIRECT_ROUTING_KEY = "direct";
@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE, true);
}
@Bean
public Queue directQueue2() {
return new Queue(DIRECT_QUEUE2, true);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}
@Bean
public Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
}
@Bean
public Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY);
}
}
这里我们创建了一个叫directExchange的交换机,绑定了directQueue和directQueue2两个队列,路由键是direct.
消息的生产者,我们通过一个Controller来进行模拟,直接引用rabbitTemplate
代码语言:javascript复制@RestController
@Slf4j
@RequestMapping("/direct")
public class DirectController {
private final RabbitTemplate rabbitTemplate;
public DirectController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* direct交换机为直连模式交换机
* 根据消息携带的路由键将消息投递给对应队列
*
*
* @return
*/
@GetMapping("send")
public Object sendMsg() {
rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct");
return "direct消息发送成功!!";
}
}
当我在浏览器访问对应连接的时候,就会生产一条消息发送到directExchange交换机,路由key为:direct, 消息内容为:发送一条测试消息:direct
接下来我们来看消息的消费者。
代码语言:javascript复制package com.lsqingfeng.action.rabbitmq.direct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @className: DirectQueueListener
* @description: 直连交换机的监听器
* @author: sh.Liu
* @date: 2021-08-23 16:03
*/
@Slf4j
@Component
public class DirectQueueListener {
/**
* 尽管设置了两个消费者,但是只有一个能够消费成功
* 多次发送则轮训消费:
* DirectReceiver消费者收到消息1 : 发送一条测试消息:direct
* DirectReceiver消费者收到消息2 : 发送一条测试消息:direct
* DirectReceiver消费者收到消息1 : 发送一条测试消息:direct
* DirectReceiver消费者收到消息2 : 发送一条测试消息:direct
*
* 一个交换机可以绑定多个队列。如果通过路由key可以匹配到多个队列,消费的时候也只能有一个进行消费
* @param testMessage
*/
@RabbitHandler
@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)
public void process(String testMessage) {
System.out.println("DirectReceiver消费者收到消息1 : " testMessage);
}
@RabbitHandler
@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)
public void process2(String testMessage) {
System.out.println("DirectReceiver消费者收到消息2 : " testMessage);
}
@RabbitHandler
@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2)
public void process3(String testMessage) {
System.out.println("DirectReceiver消费者收到消息3 : " testMessage);
}
}
当我们访问浏览器生产消息会,观察控制台结果:
DirectReceiver消费者收到消息1 : 发送一条测试消息:direct DirectReceiver消费者收到消息3 : 发送一条测试消息:direct
在发送一次:
DirectReceiver消费者收到消息3 : 发送一条测试消息:direct DirectReceiver消费者收到消息2 : 发送一条测试消息:direct
由于我们又两个队列都绑定了交换机,且routeKey一样,所以会打印两条。要注意direct只有routeKey完全匹配的时候才能被消费,同时每个队列中的消息只会 被消费一次。
3.2 fanout
配置类:
代码语言:javascript复制@Configuration
public class FanoutExchangeConfig {
public static final String FANOUT_QUEUE = "fanoutQueue";
public static final String FANOUT_QUEUE2 = "fanoutQueue2";
public static final String FANOUT_QUEUE3 = "fanoutQueue3";
public static final String FANOUT_EXCHANGE = "fanoutExchange";
public static final String FANOUT_ROUTING_KEY = "fanout";
@Bean
public Queue fanoutQueue() {
return new Queue(FANOUT_QUEUE, true);
}
@Bean
public Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE2, true);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE, true, false);
}
@Bean
public Binding bindingFanoutExchange(Queue fanoutQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}
@Bean
public Binding bindingFanoutExchange2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
这里也是用一个Fanout类型的交换机绑定了两个队列,要注意在这种模式下,是不需要指定routing-Key的,因为所有绑定的队列都会收到消息。
生产者代码如下:
代码语言:javascript复制@RestController
@Slf4j
@RequestMapping("/fanout")
public class FanoutController {
private final RabbitTemplate rabbitTemplate;
public FanoutController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* fanout交换机为扇形模式交换机
* 消息会发送到所有绑定的队列上。
* @return
*/
@GetMapping("send")
public Object sendMsg() {
rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, "发送一条测试消息:fanout");
return "fanout消息发送成功!!";
}
}
消息的消费者:
代码语言:javascript复制@Slf4j
@Component
public class FanoutQueueListener {
/**
* fanout交换机: 扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列
* 同一个队列监听多次,只会消费一次。
* 交换机绑定的多个队列都可以收到消息
* @param testMessage
*/
@RabbitHandler
@RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE)
public void process(String testMessage) {
System.out.println("FanoutReceiver消费者收到消息1 : " testMessage);
}
@RabbitHandler
@RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE)
public void process2(String testMessage) {
System.out.println("FanoutReceiver消费者收到消息2 : " testMessage);
}
@RabbitHandler
@RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE2)
public void process3(String testMessage) {
System.out.println("FanoutReceiver消费者收到消息3 : " testMessage);
}
}
打印结果:
FanoutReceiver消费者收到消息1 : 发送一条测试消息:fanout FanoutReceiver消费者收到消息3 : 发送一条测试消息:fanout
因为方法1和方法2监听的是同一个队列,只有一个可以消费成功。多次执行,两个方法交替执行。
3.3 topic
主题交换机,会根据routing-Key的匹配规则,将消息发送到符合规则的队列中。
配置类:
代码语言:javascript复制/**
* @className: TopicExchangeConfig
* @description:
* * (星号) 用来表示一个单词 (必须出现的)
* # (井号) 用来表示任意数量(零个或多个)单词
* @author: sh.Liu
* @date: 2021-08-23 15:49
*/
@Configuration
public class TopicExchangeConfig {
public static final String TOPIC_QUEUE = "topicQueue";
public static final String TOPIC_QUEUE2 = "topicQueue2";
public static final String TOPIC_QUEUE3 = "topicQueue3";
public static final String TOPIC_EXCHANGE = "topicExchange";
public static final String TOPIC_ROUTING_KEY = "topic*";
@Bean
public Queue topicQueue() {
return new Queue(TOPIC_QUEUE, true);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2, true);
}
@Bean
public Queue topicQueue3() {
return new Queue(TOPIC_QUEUE3, true);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
@Bean
public Binding bindingTopicExchange(Queue topicQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#");
}
@Bean
public Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("test.#");
}
@Bean
public Binding bindingTopicExchange3(Queue topicQueue3, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue3).to(topicExchange).with("#");
}
}
这里要注意我们的绑定管关系。分别是topic.#, test.*, #
#: 代表所有,* 代表有且只有一个。
消息的发送者,我们将routingKey作为参数方便我们看效果:
代码语言:javascript复制@RestController
@Slf4j
@RequestMapping("/topic")
public class TopicController {
private final RabbitTemplate rabbitTemplate;
public TopicController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@GetMapping("send")
public Object sendMsg(String routingKey) {
rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, routingKey, "发送一条测试消息:topic");
return "topic消息发送成功!!";
}
}
消息的消费者:
代码语言:javascript复制/**
* @className: TopicQueueListener
* @description: 主题交换机的监听器
* @author: sh.Liu
* @date: 2021-08-23 16:03
*/
@Slf4j
@Component
public class TopicQueueListener {
/**
* topic: 主题交换机
* @param testMessage
*/
@RabbitHandler
@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE)
public void process(String testMessage) {
System.out.println("TopicReceiver消费者收到消息1 : " testMessage);
}
@RabbitHandler
@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE)
public void process2(String testMessage) {
System.out.println("TopicReceiver消费者收到消息2 : " testMessage);
}
@RabbitHandler
@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE2)
public void process3(String testMessage) {
System.out.println("TopicReceiver消费者收到消息3 : " testMessage);
}
@RabbitHandler
@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE3)
public void process4(String testMessage) {
System.out.println("TopicReceiver消费者收到消息4 : " testMessage);
}
}
请求:http://localhost:7890/topic/send?routingKey=test.a
结果:
TopicReceiver消费者收到消息3 : 发送一条测试消息:topic TopicReceiver消费者收到消息4 : 发送一条测试消息:topic
代表: test.* 和 # 与路由key匹配成功
请求:http://localhost:7890/topic/send?routingKey=topic.123
TopicReceiver消费者收到消息1 : 发送一条测试消息:topic TopicReceiver消费者收到消息4 : 发送一条测试消息:topic
代表: topic.# 和 # 匹配成功
请求: http://localhost:7890/topic/send?routingKey=test
TopicReceiver消费者收到消息4 : 发送一条测试消息:topic
test.* 后面必须要有一个单词
请求: http://localhost:7890/topic/send?routingKey=test.aaa
TopicReceiver消费者收到消息4 : 发送一条测试消息:topic TopicReceiver消费者收到消息3 : 发送一条测试消息:topic
test.*和 #匹配成功
请求:http://localhost:7890/topic/send?routingKey=test.aaa.b
TopicReceiver消费者收到消息4 : 发送一条测试消息:topic
只对# 匹配成功, 因为test.*只能匹配一个单词,aaa.b代表两个