RabbitMQ支持AMQP协议。AMQP(Advanced Message Queue Protocal)高级消息队列协议是进程间传递异步消息的网络协议。在分布式系统中,除了RPC远程服务调用外,消息队列也是一个重要的通信手段。
1 概述
1.1 基本组成
RabbitMQ中相关核心概念如下:
- Broker:消息队列服务主机
- Exchange:消息交换机,指定消息按某种规则、路由到某个队列
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列
- Binding:绑定,将Exchange与Queue按照某个路由规则绑定起来
- RoutingKey:路由关键字,Exchange根据该关键字进行消息投递
- Vhost:虚拟主机,做资源隔离。
- Connection:客户端与主机建立的TCP连接。
- Channel:消息通道,在同一个Connection中可以有多个Channel,每个Channel都会被指派一个唯一的ID。
1.1.1 交换机(Exchange)
交换机有四种类型:direct exchange、topic exchange、fanout exchange、headers exchange。
- direct exchange:Exchange与Queue进行绑定时,需设置相应的RoutingKey。只有当
RoutingKey
完全匹配时,才会进行消息投递。 - topic exchange:与
direct exchange
类似,只不过在匹配时是模糊匹配。RoutingKey
中的(#
表示匹配一个或多个单词;*
匹配一个单词)。 - fanout exchange:广播模式,将消息路由到所有绑定的队列中,无需进行
RoutingKey
的匹配。 - headers exchange:根据消息中的headers属性匹配投递。
其他属性:
- Durability:Durable(持久的),Transiant(短暂的)。
- Auto delete:当有队列或交换机绑定了本交换机,然后队列或者交换机都又进行了解绑后,自动删除
- Internal:是否为内部使用。true表示为内部交换机,客户端无法直接向该交换机发送消息。
- Arguments:
- alternate-exchange:备份交换机,当消息无法路由到具体队列时,将交给备份交换机处理。
1.1.2 队列(Queue)
消息只能存储至队列中,多个消费者可以订阅同一个队列。
基本属性:
- Type:Classic,Quorum
- Durability:Durable(持久的),Transiant(短暂的)。
- Auto delete:当有消费者订阅,然后所有的消费者又都断开连接后,自动删除。
- Arguments:
- x-message-ttl:Message在Queue中存活的时间,超过将被丢弃。
- x-expires:Queue在没有被使用的情况下,过期时间。
- x-max-length:Queue中可以存放的最大消息数,超过将被丢弃。
- x-max-length-bytes:Queue中可以存放总消息体大小,超过将被丢弃。
- x-overflow:Queue溢出后的行为:
drop-head
,reject-publish
orreject-publish-dlx
。 - x-single-active-consumer:确保只有一个消费者订阅,当出现异常时自动转向另外一个。
- x-max-priority:队列支持的最大优先级,不设置将不支持。
- x-dead-letter-exchange:当Message被
rejected
或expire
时,将会重新发布到哪个交换机。 - x-dead-letter-routing-key:当某个消息是死信时,使用的
RoutingKey
,未设置的话则使用原始Key。 - x-queue-mode:当设置为
lazy
时,会将消息尽可能多的放置到磁盘上,以减少内存使用。 - x-queue-master-locator:将队列设置为主节点定位模式。
2 使用
2.1 如何保证消息不丢失
2.1.1 消息发送确认机制(Confirm机制)
发布者确认机制是RabbitMQ的扩展,可以实现可靠的发布。在通道上开启后,客户端将会收到异步确认消息。
这块其实有3种方式实现:基于RabbitMQ事务、同步确认方式、异步确认方式。一般使用异步确认方式,来保证消息的可靠发送。
同时,在Exchange
向Queue
投递消息时,可能会出现无法投递的情况。此时可以设置alternate-exchange
备份交换机兜底,也可以使用Return
机制,当出现无法投递的情况,进行回调。
public class AmqpMsgUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
RabbitTemplate rabbitTemplate;
public AmqpMsgUtils(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
}
public void sendMsg(String msg, String topic) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(Consts.SP_EXCHANGE, topic, msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// TODO: 具体发送异常处理
System.out.println("confirm: " correlationData ", " ack ", " cause);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// TODO: 具体投递异常处理
System.out.println("returnedMessage: " message);
}
}
2.1.2 MQ持久化
声明交换机、队列时,设置其durable
为true
。并将消息的Delivery mode
设置为2-persistent
持久化。
2.1.3 手动ACK
由于网络不可靠及应用程序可能失败,因此通常需要进行某种处理确认。RabbitMQ里面有两种确认方式:一种是确认已经收到消息这一事实,另一种是确认消息已由消费者处理和验证。在需要确保消息不能丢失的场景下,通常使用手动Ack模式。
代码语言:javascript复制@Component
@RabbitListener(queues = {Consts.SP_QUEUE_A, Consts.SP_QUEUE_B}, ackMode = "MANUAL")
public class AmqpQueueHandler {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
// 为`true`时 说明消息被重新投递过,处理失败则进行`reject`。此时可进入`死信队列`或`写日志/数据库`
if (message.getMessageProperties().getRedelivered()) {
// TODO: 消息无法正常消费,异常处理
System.out.println("Redelivered: " "xxx");
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException ioException) {
ioException.printStackTrace();
}
} else {
// 消息首次消费异常 进行`ReQueue`操作
System.out.println("Error ReQueue: " "xxx");
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
}
2.1.4 配置RabbitMQ镜像集群
镜像集群,避免单点故障。
2.2 消息重复消费(幂等性)
消息重复不可避免。如发送失败重试,消费异常发生了ReQueue
操作等情况。所以保持消费端的幂等性很重要。
消息有重复的前提是消息之间能判断是否相等,即消息也有类似haseCode()
的东西,假设为mid
。
现在无非就是如何判重的问题:
- 若为数据库插入操作,可设置
mid
为唯一索引,进行去重。 - 消费端也可建立专门的去重表。也可借助
Redis
等中间件进行去重。
2.3 消息堆积
消息堆积的出现无非只有一种情况,消费者的消费速度赶不上生产者的生产速度。原因可能有很多,比如消费者出现异常导致消费变慢、生产突然产生大量消息。解决这个问题,要找到问题的源头也就是原因。
解决方案要考虑生产者和消费者。发生消息堆积后,生产者服务降级,关闭一些非核心业务,减少消息的产生。消费者优化性能,临时将堆积消息转移至新队列,临时增加消费者去消费。
同时针对RabbitMQ,手动ACK时,可以设置prefetch_count
来使消费者根据自己的消费能力进行消费,避免出现消费能力弱的消费者堆积消息。
2.4 延时任务
主要利用了死信队列。将消息发送到一个没有消费者的队列中,并设置x-message-ttl
,以及x-dead-letter-exchange
和x-dead-letter-routing-key
。当消息过期时,消息会被投放至相应的交换机中,配置交换机路由至具体的队列中,再消费该队列中的消息即可。
3 应用场景
3.1 异步处理
如商城用户下单场景,订单支付完成后需向用户发送订单变化站内消息、以及增加积分等操作。可以在用户完成主流程后,向队列推送订单支付消息,由站内信系统
和积分系统
订阅该消息,进行后续处理,无需阻塞订单支付主流程,进而提高接口响应速度。
3.2 应用解耦
如用户注册场景,需同步向CRM
系统写入,并向账号系统
初始化用户账户信息。在使用消息队列解耦后,能避免因其他系统故障导致注册不成功的情况出现。用户注册成功后,将消息写入队列,即返回用户注册成功。其他系统订阅注册消息,然后进行相应的业务操作。
3.3 流量削峰
如秒杀场景,一般会因为流量过大影响响应速度,甚至是宕机。此时可以加入消息队列,对请求进行缓冲。用户的请求先写入消息队列,设置消息队列的最大值,超过阈值则直接丢弃或跳转错误页,后续应用可根据自己的能力进行业务处理。
4 集群
RabbitMQ的集群有两种模式:普通模式、镜像模式。
4.1 普通模式
元数据信息在所有节点上一致,但是队列的完整内容只存在创建它的节点上,各个节点只有相同的队列元数据。若生产者将消息发送至A节点后,消费者从B节点获取数据,RabbitMQ会临时在节点B和节点A之间进行消息传输。这种模式存在单点故障,一般不会使用。
4.2 镜像模式
镜像模式在普通模式基础上,将需要的队列做成镜像队列
。该模式会主动的将镜像队列的消息实体在各镜像队列节点间进行同步。但该模式会降低系统性能,若镜像队列过多还会消耗大量的内部带宽。一般对可靠性要求比较高的场景建议采用镜像模式。
- https://www.rabbitmq.com/documentation.html