消息队列-RabbitMQ
在微服务的使用中,我们不可避免需要服务之间的相互调用,但传统模式下,我们使用如OpenFeign的调用方式,需要等待被调用方直接业务并返回结果后,才能进行后续任务,此时,调用者会处于阻塞状态。
这种调用称为同步调用也叫同步通讯,但实际场景,有很多都需要我们实现异步通讯。而消息队列就是实现异步通讯的方式
两种方式各有优劣,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。
异步调用
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用方
- 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
- 消息接收者:接收和处理消息的人,就是原来的服务提供方
在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。 这样,发送消息的人和接收消息的人就完全解耦了。
RabbitMQ使用
RabbitMQ本体的安装我们就省略掉了,直接进入SpringBoot应用RabbitMQ的方法。
因为RabbitMQ遵循AMQP协议,因此任何语言只要遵循AMQP协议,都可以与RabbitMQ交互。
我们导入依赖:
代码语言:javascript复制 <!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置环境变量:
代码语言:javascript复制spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /root # 虚拟主机
username: root # 用户名
password: 123 # 密码
之后即可进行消息的收发:
发送消息:
代码语言:javascript复制@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
接收消息
代码语言:javascript复制@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" msg "】");
}
}
需要注意的是,这里我们只是绑定了队列,实际上并没有创建队列,需要手动创建队列,或者创建配置类,并申请为Bean来实现队列自动管理。
代码语言:javascript复制@Configuration
public class RabbitConfig {
@Bean
public Queue Queue(){
return new Queue("test");
}
}
此时如果我们存在多个消费者,消费者之间会平分消息,但在实际场景下,不同消费者性能不同,其消费速度也有区别,平分消息可能会导致较快的那个消费者空闲,从而影响了整体性能,因此,我们需要一种“能者多劳”的方式去分配。
我们只需对配置进行简单设置即可
代码语言:javascript复制spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
这样处理较快的消费者就能去处理更多消息,可以有效避免消息积压。
这种让多个消费者绑定到一个队列,共同消费队列中的消息叫做 Work queues,任务模型
其使用需要注意:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
交换机类型
在之前的代码中,我们并没有引入交换机,而是生产者直接发送消息到消息队列,而如果引入交换机,我们可以通过将消息发送给交换机,交换机再通过自己的规则发送消息到绑定的队列中。
RabbitMQ交换机有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
消息发送可以:
代码语言:javascript复制@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
在这里,发送方法的第一个参数是交换机名称,第二个为路由键,第三个为发送的消息。
需要注意的是,
Topic和Direct类似都可以根据路由键把消息路由到不同队列,但Topic可以在绑定键上使用通配符,即使用#和*两个符号:
#
:代表0个或多个词*
:代表1个词
总结来说:
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
声明队列和交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。 因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
SpringAMQP提供了一个Queue类,用来创建队列
SpringAMQP还提供了一个Exchange接口,来表示所有不同类型的交换机:
我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程
而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象
代码语言:javascript复制@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
同时,Spring还提供了注解方式来声明:
代码语言:javascript复制@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" msg "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" msg "】");
}
消息转换器
Spring的消息发送代码接收的消息体是一个Object,
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
因此我们需要使用其他转换器,如JSON转换器:
在publisher
和consumer
两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意,如果项目中引入了spring-boot-starter-web
依赖,则无需再次引入Jackson
依赖。
配置消息转换器,在publisher
和consumer
两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}