1. 前言
在消息中间件你该了解的秘密一文中详细介绍了如何使用RabbitMQ
发送消息、消费消息;如何保证生产者发送消息的可靠性;如何保证消费消息的可靠性、如何横向扩展消费者以及如何对消费者进行流向削峰。
2.初衷
本文的初衷旨在搞懂为什么使用@Component
@RabbitListener
注解就可以完成消息的消费,如何对消费者进行定制化配置?带着这些疑问开启本次的源码分析之路。
3.源码分析
3.1 寻找自动配置类
众所周知,所有与SpringBoot
整合的中间件都是以starter
的方式引入到项目中,这种情况下SpringBoot
会有一个相关的自动配置类帮我们做一些默认配置,从而达到开箱即用的效果。寻找相关的自动配置类方法也很简单,只需要输入与之相关的名称即可,例如输入RabbitAuto
就可以搜索RabbitAutoConfiguration
这样一个自动配置类
3.2 自动配置类分析
打开RabbitAutoConfiguration
自动配置类,可以看到如上内容:
@Configuration
注解表明这一个配置类,会被框架扫描解析并注入到IOC容器中@ConditionalOnClass
注解表明,只有classpath路径下存在RabbitTemplate.class
和Channel.class
这两个类才会去扫描解析当前自动配置类@EnableConfigurationProperties
注解实例化RabbitProperties
并将yml或properties文件中的相关属性注入到该对象中@Import
注解表明要引入RabbitAnnotationDrivenConfiguration
配置类
根据RabbitAnnotationDrivenConfiguration
名称可以猜出该配置类大概就是我们要分析的入口类了
3.3 入口类分析
3.3.1 初始化消息相关属性
3.3.2 声明监听器容器工厂配置以及监听器容器工厂
进入配置类配置容器工厂的方法中
这里我们可以得出一个重要的信息,那就是可以通过配置文件中的配置对SimpleRabbitListenerContainerFactory
进行相关配置操作
spring:
rabbitmq:
listener:
simple:
concurrency: 4
max-concurrency: 10
batch-size: 100
prefetch: 100
acknowledge-mode: MANUAL
到目前一共出现了几个比较重要的类:
SimpleRabbitListenerContainerFactoryConfigurer
SimpleRabbitListenerContainerFactory
RabbitProperties
它们之间的关系是这样的SimpleRabbitListenerContainerFactoryConfigurer
容器工厂配置类持有RabbitProperties
属性配置对象,然后对SimpleRabbitListenerContainerFactory
进行相关配置,RabbitProperties
属性又可以在application.yml文件中进行配置
3.3.3 开启RabbitMQ功能
@EnableRabbit注解引入了两个重要的类RabbitListenerAnnotationBeanPostProcessor
和RabbitListenerEndpointRegistry
。RabbitListenerAnnotationBeanPostProcessor
用于Bean的后置处理,这里可以想象一下该后置处理器会Bean实例化之后,对含有@RabbitListener
注解的类进行特殊代理,从而实现对消息的消费
3.4 后置处理
3.4.1 后置处理方法
在RabbitListenerAnnotationBeanPostProcessor
后置处理器的后置处理方法
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
return bean;
}
中会对含有@RabbitListener
注解的bean进行处理
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object target, String beanName) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
endpoint.setQueueNames(resolveQueues(rabbitListener));
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
endpoint.setBeanFactory(this.beanFactory);
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
// ...省略部分代码
resolveExecutor(endpoint, rabbitListener, target, beanName);
resolveAdmin(endpoint, rabbitListener, target);
resolveAckMode(endpoint, rabbitListener);
resolvePostProcessor(endpoint, rabbitListener, target, beanName);
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);
this.registrar.registerEndpoint(endpoint, factory);
}
每个含有@RabbitListener
注解的方法对应一个MethodRabbitListenerEndpoint
对象,该对象会存储被@RabbitListener
注解方法相关属性以及@RabbitListener
注解指定的属性,最终和SimpleRabbitListenerContainerFactory
组成AmqpListenerEndpointDescriptor
对象放入endpointDescriptors
集合中
3.4.3 Bean创建完成后的初始化方法
由于RabbitListenerAnnotationBeanPostProcessor
实现了SmartInitializingSingleton
接口,因此会回调afterSingletonsInstantiated()
方法,在回调方法中会遍历3.4.2endpointDescriptors
集合进行SimpleMessageListenerContainer
注册,然后使用SimpleRabbitListenerContainerFactory
和MethodRabbitListenerEndpoint
进行属性配置,最终把创建的SimpleMessageListenerContainer
放入RabbitListenerEndpointRegistry
的listenerContainers
容器中
3.5 应用生命周期
RabbitListenerEndpointRegistry
实现了SmartLifecycle
接口,在应用启动完成之后会回调start()
方法
3.6 小结
RabbitListenerAnnotationBeanPostProcessor
后置处理方法会处理含有@RabbitListener
注解的方法,创建MethodRabbitListenerEndpoint
对象与SimpleRabbitListenerContainerFactory
组成一对放入集合中RabbitListenerAnnotationBeanPostProcessor
初始化方法会遍历上一步的集合根据SimpleRabbitListenerContainerFactory
创建SimpleMessageListenerContainer
对象,然后使用SimpleRabbitListenerContainerFactory
和MethodRabbitListenerEndpoint
对SimpleMessageListenerContainer
进行配置,最终将SimpleMessageListenerContainer
放入map容器中RabbitListenerEndpointRegistry
启动方法会遍历上一步的map容器,启动SimpleMessageListenerContainer
4. 启动监听容器
4.1 启动
代码语言:javascript复制protected void doStart() {
synchronized (this.consumersMonitor) {
// 1.根据并发数来初始化消费者
int newConsumers = initializeConsumers();
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers) {
// 2.每个消费者创建一个AsyncMessageProcessingConsumer对象
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// 3.使用一个线程去执行
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
waitForConsumersToStart(processors);
}
}
4.2 执行任务
代码语言:javascript复制@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
try {
// 1.启动消费者
initialize();
// 2.消费消息
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
}
}
4.2 启动消费者
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run--->org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#initialize--->org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#start--->org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#setQosAndreateConsumers--->org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#consumeFromQueue
代码语言:javascript复制private void consumeFromQueue(String queue) throws IOException {
// 1.设置消息回调处理
InternalConsumer consumer = new InternalConsumer(this.channel, queue);
// 2.告诉消息服务器当前channel消费的队列名称,消息ack模式以及当消息服务器推送消息后要执行的回调consumer
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
this.exclusive, this.consumerArgs,
consumer);
if (consumerTag != null) {
this.consumers.put(queue, consumer);
if (logger.isDebugEnabled()) {
logger.debug("Started on queue '" queue "' with tag " consumerTag ": " this);
}
}
else {
logger.error("Null consumer tag received for queue " queue);
}
}
4.3 消息回调处理
消息回调处理由InternalConsumer负责
代码语言:javascript复制@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) {
try {
if (BlockingQueueConsumer.this.abortStarted > 0) {
// 1.当消息服务器将消息派发给消费者时由回调将消息放入到队列中
if (!BlockingQueueConsumer.this.queue.offer(
new Delivery(consumerTag, envelope, properties, body, this.queueName),
BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
}
}
else {
BlockingQueueConsumer.this.queue
.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
}
}
}
4.4 消费消息
回到4.2部分的代码,进入mainLoop()
查看消费消息的逻辑
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
Channel channel = consumer.getChannel();
List<Message> messages = null;
long deliveryTag = 0;
for (int i = 0; i < this.batchSize; i ) {
logger.trace("Waiting for message from consumer.");
// 1.从队列中取消息
Message message = consumer.nextMessage(this.receiveTimeout);
if (this.consumerBatchEnabled) {
}
else {
try {
// 2.调用监听器方法进行执行业务逻辑
executeListener(channel, message);
}
}
}
if (this.consumerBatchEnabled && messages != null) {
executeWithList(channel, messages, deliveryTag, consumer);
}
// 3.向消息服务器发送ack
return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
4.5 小结
- 启动
SimpleMessageListenerContainer
,根据并发数创建消费者 - 告诉消息服务器要消费的队列、ack模式,指定处理消息的回调
- 消息服务器推送消息给消费者,执行回调,回调将消息放入队列中
- 消费者线程无限循环从队列中取消息,消费消息执行业务逻辑
- 执行完业务逻辑后向消息服务器发送ack
- 之前,给大家发过三份Java面试宝典,这次新增了一份,目前总共是四份面试宝典,相信在跳槽前一个月按照面试宝典准备准备,基本没大问题。《java面试宝典5.0》(初中级)《350道Java面试题:整理自100 公司》(中高级)《资深java面试宝典-视频版》(资深)《Java[BAT]面试必备》(资深)分别适用于初中级,中高级,资深级工程师的面试复习。内容包含java基础、javaweb、mysql性能优化、JVM、锁、百万并发、消息队列,高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级等等。获取方式:点“在看”,V信关注上述Java最全面试题库号并回复 【面试】即可领取,更多精彩陆续奉上。 看到这里,证明有所收获必须点个在看支持呀,喵