代码语言:javascript复制
rabbitmq:3.9.2
spring-boot-starter-amqp:2.3.0.RELEASE
架构
- Producer 生产者
- 消息成功发送到交换机, 会触发回调事件ConfirmCallback(需要配置)
- 消息不能被交换机转发到队列中时, 会触发回调事件ReturnCallback(需要配置)
- Exchange 交换器
- Fanout 把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中. 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的
- Direct 把消息路由到BingingKey 和RoutingKey 完全匹配的队列中 会把消息路由到BindingKey 和RoutingKey 完全匹配的队列中. 如果一个队列绑定到该交换机上要求路由键"dog",则只有被标记为"dog"的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog, 多个队列可以使用一个键
- Topic Topic 类型的交换器在 direct 匹配规则上进行了扩展. 不是进行完全匹配. 而是进行模糊匹配. 匹配规则如下: BindingKey和RoutingKey一样都是由"."分隔的字符串; BindingKey中可以存在两种特殊字符"*" 和 "#", 用于模糊匹配, 其中"*"用于匹配一个单词, "#"用于匹配多个单词(可以是0个), 因此"audit.#"能够匹配到"audit.irs.corporate",但是"audit.*" 只会匹配到"audit.irs"
- Headers Handers 类型的交换器不是根据路由匹配规则来的,而是根据消息中的 headers 属性进行匹配的。在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果匹配,消息就会路由到该队列。headers类型的交换器性能很差,不实用
- Bindings 绑定(匹配)器 把exchange和queue按照路由规则绑定(匹配)起来
- BindingKey 队列绑定的key, 初始化时绑定
- RoutingKey 消息携带的
- Queues 队列
- Consumer 消费者
其他概念
- Broker 提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输
- Vhost 虚拟主机,一个broker里可以有多个vhost, 用作不同用户的权限分离
- Channel 消息通道, 可以理解为建立在生产者/消费者和RabbitMQ服务器之间的TCP连接上的虚拟连接,一个TCP连接上可以建立多个Channel
特性
消息确认
- 自动确认
- RabbitMQ成功将消息发出 (即消息成功写入TCP Socket) 中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递
- 手动确认
- basic.ack 肯定确认
- basic.nack 用于否定确认 (注意: 这是AMQP 0-9-1的RabbitMQ扩展)
- basic.reject用于否定确认, 但与basic.nack相比有一个限制: 一次只能拒绝单条消息
- 使用手动确认需要对客户端进行配置.
- 第2, 3种, 都可以通过设置参数, 将消息重新放回到队列中
TTL
- 时间范围
0 <= n <= 2^32-1
ms, 约 49 天
死信队列
- 可以和TTL配合实现延时队列 将消息设置ttl, 发送到死信队列中 (不设置消费者处理), 等待过期被转发到延时队列
- 但该延时队列有缺陷, 若发送两条延时消息, 第一条延时10s, 第二条延时5秒, 若第一条先入队列, 则只有当第一条消息过期发送到死信队列后, 第二条消息才能被处理, 即过期是阻塞的. 但可以通过安装
rabbitmq_delayed_message_exchange
插件解决, https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 终端启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 代码中配置
/**
*
* @author https://www.skypyb.com/
*/
@Configuration public class RabbitBindConfig {
public final static String SKYPYB_DELAY_EXCHANGE = "skypyb-delay-exchange";
public final static String SKYPYB_DELAY_QUEUE = "skypyb-delay-queue";
public final static String SKYPYB_DELAY_KEY = "skypyb.key.delay";
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//自定义交换机
return new CustomExchange(SKYPYB_DELAY_EXCHANGE, "x-delayed-message", false, true, args);
}
@Bean
public Queue delayQueue() {
return new Queue(SKYPYB_DELAY_QUEUE, false, false, true);
}
@Bean
public Binding bindingDelayExchangeAndQueue() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(SKYPYB_DELAY_KEY).noargs();
}
}
消息重试
- 生产者重试 # 开启重试机制 spring.rabbitmq.template.retry.enabled=true # 重试起始间隔时间 spring.rabbitmq.template.retry.initial-interval=1000ms # 最大重试次数 spring.rabbitmq.template.retry.max-attempts=10 # 最大重试间隔时间 spring.rabbitmq.template.retry.max-interval=10000ms # 间隔时间乘数 (这里配置间隔时间乘数为 2, 则第一次间隔时间 1 秒, 第二次重试间隔时间 2 秒, 第三次 4 秒, 以此类推) spring.rabbitmq.template.retry.multiplier=2
- 消费者重试
- spring.rabbitmq.listener.simple.acknowledge-mode: auto
- spring.rabbitmq.listener.simple.retry.enabled=true: 若业务方法抛出异常, 则消费者端会根据配置进行有限次数的重试, 超过次数仍没有消费成功则将消息ack, 这种情况下的重试, 仅是消费者内部进行的重试, 消息并不会再次进入当前监听的队列
- spring.rabbitmq.listener.simple.retry.enabled=false, 若业务方法抛出异常, 消息会重新入列(重新入列策略在下文说明), 进行重试, 在rabbitmq management对应的queue页面中, 可以观察到Redelivered有值, 在该模式下, 消息会一直进行重试, spring-boot-starter-amqp包会对异常进行处理, 自动调用nack方法(org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#rollbackOnExceptionIfNecessary)
- spring.rabbitmq.listener.simple.acknowledge-mode: manual
- RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者
- 当业务方法抛出异常时, 当前消费者会被阻塞, 当前队列的其他消费者不受影响, 若spring.rabbitmq.listener.simple.retry.enabled=true, 重试指定次数后, 继续阻塞
- spring.rabbitmq.listener.simple.acknowledge-mode: none
- 重试策略与auto基本一致, 但这种模式下, 是发送即忘的, 即队列将消息写入tcp通道后完成后, 就认为该消息已确认
- 也可以配置spring.rabbitmq.listener.simple.retry.enabled=true进行消费端的重试
消息重新入列
消息重新入列是在队列头部
Demo验证
代码语言:javascript复制 @RabbitHandler
public void process(Map<String, Object> testMessage, Channel channel, Message message) throws Exception {
Thread.sleep(3000);
String data = testMessage.get("messageData").toString();
log.info("DirectReceiver消费者收到消息: {}", data);
if (data.contains("2")) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
消费者端配置手动确认
消费线程数为1
生产者发送6条消息, messageData分别为1, 2, 3, 4, 5, 6
prefetch设为100
, 消费情况如下
DirectReceiver消费者收到消息: 1
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 3
DirectReceiver消费者收到消息: 4
DirectReceiver消费者收到消息: 5
DirectReceiver消费者收到消息: 6
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
...
prefetch设为3
, 消费情况如下
DirectReceiver消费者收到消息: 1
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 3
DirectReceiver消费者收到消息: 4
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 5
DirectReceiver消费者收到消息: 6
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
...
prefetch设为1
, 消费情况如下
DirectReceiver消费者收到消息: 1
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
...
事务性
1. 生产者 保证消息发送的原子性
事务与异步确认机制是冲突的, 只能启用其中一个
配置
代码语言:javascript复制@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//启用通道事务性
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
使用 发送方法上加注解
代码语言:javascript复制@Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
开启事务模式之后,RabbitMQ 生产者发送消息会多出几个步骤:
- 客户端发出请求,将信道设置为事务模式
- 服务端给出回复,同意将信道设置为事务模式
- 客户端发送消息
- 客户端提交事务
- 服务端给出响应,确认事务提交
2. 消费者 保证消息消费的原子性
配置
代码语言:javascript复制@Bean
public AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer>
configure(AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> factory) {
factory.setChannelTransacted(true);
return factory;
}
使用 同生产者一致
消费者若启用事务, 则spring.rabbitmq.listener.simple.acknowledge-mode最好为auto, 若为manual, rollback后, 会阻塞当前消费者, 消息一直为unacked状态
高可用
- 消息集群 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
- 镜像队列 可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用
- 跟踪机制 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么
- 持久化 交换机/队列/消息
其他配置
- 默认配置可能导致Channel线程不安全, 可以配置Channel缓存(池化)
//缓存模式 缓存channel
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
//最多缓存25个channel
connectionFactory.setChannelCacheSize(25);
//channel超时等待 当ChannelCheckoutTimeout的值大于0的时候,ChannelCacheSize的值就是最大的channel数量了,一旦从缓存中获取不到channel,等待ChannelCheckoutTimeout毫秒后,如果还是获取不到的,就会抛AmqpTimeoutException
connectionFactory.setChannelCheckoutTimeout(1000);