RabbitMQ 使用与详解
RabbitMQ参考中文文档
1. RabbitMQ原理详解
- Producer(生产者),产生消息并向RabbitMq发送消息
- Consumer(消费者),等待RabbitMq消息到来并处理消息
- Queue(队列), 依存于RabbitMQ内部, 虽然消息通过RabbitMQ在你的应用中传递,但是它们只能存储在queue中
- message acknowledgment,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除
- message durability,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。
- Prefetch Count,如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
- Exchange(交换器),生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)
- routing key,生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效,RabbitMQ为routing key设定的长度限制为255 bytes
- Binding,RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了
- binding key,在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key,binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
- Exchange Type,常见的有fanout、direct、topic、headers这四种 fanout fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中
direct 把消息路由到那些binding key与routing key完全匹配的Queue中
topic
把消息路由到那些binding key与routing key模糊匹配的Queue中
匹配规则:
- routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“aa.bb.cc”
- binding key与routing key一样也是句点号“. ”分隔的字符串
- binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
header
代码语言:javascript复制headers类型的Exchange不依赖于routingkey与binding key的匹配规则来路由消息,而是根据发送的消息内容中的 headers属性进行匹配。
2. 运行RabbitMQ
使用docker运行,要使用管理页面用management的版本
代码语言:javascript复制docker run -d --name rabbitmq --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq:management
管理页面默认用户名和密码都是guest
3. 创建QUEUE
点击Queues,Add a new queue
填入queue名称保存即可
4. 创建Exchange
点击Exchanges,Add a new exchange
输入Echange名称,选择type
保存即可
5. 绑定queue和exchange
点击刚才创建的exchange,Bindings下面填入queue的名称和Routing Key即可
6. 创建springboot程序来收发消息
pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mt.demo</groupId>
<artifactId>spring-boot-rabbitmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-rabbitmq-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
代码语言:javascript复制spring:
application:
name: rabbitmq-demo
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirms: true #生产者可以判断消息是否发送到了broker
publisher-returns: true #生产者可以判断消息是否发送到了queue
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
server:
port: 10001
先在RabbitMQ管理页面上创建hello的队列,并且使用绑定到topic交换器上
创建一个消费者
代码语言:javascript复制@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener {
@RabbitHandler
public void process(String hello) {
log.info("Receiver: {}", hello);
}
}
创建一个生产者
代码语言:javascript复制@GetMapping("/send")
public void send(@RequestParam String topic, @RequestParam String route, @RequestParam String msg) {
log.info("send topic[{}], msg: {}", topic, msg);
rabbitTemplate.convertAndSend(topic, route, msg);
}
如果再创建一个消费者绑定同样的队列,则可以看到两个消费者交替收到消息
代码语言:javascript复制@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener2 {
@RabbitHandler
public void process(String hello) {
log.info("Receiver2: {}", hello);
}
}
如果再创建一个queue和前一个使用一样的bindingkey,则发送的消息会同是发送进两个queue
配置RabbitTemplate,加入消息确认机制回调
代码语言:javascript复制@Autowired
private ReturnCallBackListener returnCallBackListener;
@Autowired
private ConfirmCallbackListener confirmCallbackListener;
@Bean
public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(confirmCallbackListener);
/**
* 当mandatory标志位设置为true时
* 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息
* 那么broker会调用basic.return方法将消息返还给生产者
* 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallBackListener);
return rabbitTemplate;
}
ConfirmCallback: ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调
ReturnCallback:ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调
完整代码参考GITHUB