@RabbitListener注解你不知道的都在这

2021-01-05 11:10:29 浏览数 (1)

1. 前言

在消息中间件你该了解的秘密一文中详细介绍了如何使用RabbitMQ发送消息、消费消息;如何保证生产者发送消息的可靠性;如何保证消费消息的可靠性、如何横向扩展消费者以及如何对消费者进行流向削峰。

2.初衷

本文的初衷旨在搞懂为什么使用@Component @RabbitListener注解就可以完成消息的消费,如何对消费者进行定制化配置?带着这些疑问开启本次的源码分析之路。

3.源码分析

3.1 寻找自动配置类

众所周知,所有与SpringBoot整合的中间件都是以starter的方式引入到项目中,这种情况下SpringBoot会有一个相关的自动配置类帮我们做一些默认配置,从而达到开箱即用的效果。寻找相关的自动配置类方法也很简单,只需要输入与之相关的名称即可,例如输入RabbitAuto就可以搜索RabbitAutoConfiguration这样一个自动配置类

3.2 自动配置类分析

打开RabbitAutoConfiguration自动配置类,可以看到如上内容:

  • @Configuration注解表明这一个配置类,会被框架扫描解析并注入到IOC容器中
  • @ConditionalOnClass注解表明,只有classpath路径下存在RabbitTemplate.classChannel.class这两个类才会去扫描解析当前自动配置类
  • @EnableConfigurationProperties注解实例化RabbitProperties并将yml或properties文件中的相关属性注入到该对象中
  • @Import注解表明要引入RabbitAnnotationDrivenConfiguration配置类

根据RabbitAnnotationDrivenConfiguration名称可以猜出该配置类大概就是我们要分析的入口类了

3.3 入口类分析

3.3.1 初始化消息相关属性
3.3.2 声明监听器容器工厂配置以及监听器容器工厂

进入配置类配置容器工厂的方法中

这里我们可以得出一个重要的信息,那就是可以通过配置文件中的配置对SimpleRabbitListenerContainerFactory进行相关配置操作

代码语言:javascript复制
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 4
        max-concurrency: 10
        batch-size: 100
        prefetch: 100
        acknowledge-mode: MANUAL

到目前一共出现了几个比较重要的类:

  • SimpleRabbitListenerContainerFactoryConfigurer
  • SimpleRabbitListenerContainerFactory
  • RabbitProperties

它们之间的关系是这样的SimpleRabbitListenerContainerFactoryConfigurer容器工厂配置类持有RabbitProperties属性配置对象,然后对SimpleRabbitListenerContainerFactory进行相关配置,RabbitProperties属性又可以在application.yml文件中进行配置

3.3.3 开启RabbitMQ功能

@EnableRabbit注解引入了两个重要的类RabbitListenerAnnotationBeanPostProcessorRabbitListenerEndpointRegistryRabbitListenerAnnotationBeanPostProcessor用于Bean的后置处理,这里可以想象一下该后置处理器会Bean实例化之后,对含有@RabbitListener注解的类进行特殊代理,从而实现对消息的消费

3.4 后置处理

3.4.1 后置处理方法

RabbitListenerAnnotationBeanPostProcessor后置处理器的后置处理方法

代码语言:javascript复制
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    Class<?> targetClass = AopUtils.getTargetClass(bean);
    final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
    for (ListenerMethod lm : metadata.listenerMethods) {
        for (RabbitListener rabbitListener : lm.annotations) {
            processAmqpListener(rabbitListener, lm.method, bean, beanName);
        }
    }
    return bean;
}

中会对含有@RabbitListener注解的bean进行处理

代码语言:javascript复制
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
    Method methodToUse = checkProxy(method, bean);
    MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
    endpoint.setMethod(methodToUse);
    processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
   Object target, String beanName) {

  endpoint.setBean(bean);
  endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  endpoint.setId(getEndpointId(rabbitListener));
  endpoint.setQueueNames(resolveQueues(rabbitListener));
  endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
  endpoint.setBeanFactory(this.beanFactory);
  endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
  // ...省略部分代码
  resolveExecutor(endpoint, rabbitListener, target, beanName);
  resolveAdmin(endpoint, rabbitListener, target);
  resolveAckMode(endpoint, rabbitListener);
  resolvePostProcessor(endpoint, rabbitListener, target, beanName);
  RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);

  this.registrar.registerEndpoint(endpoint, factory);
 }

每个含有@RabbitListener注解的方法对应一个MethodRabbitListenerEndpoint对象,该对象会存储被@RabbitListener 注解方法相关属性以及@RabbitListener注解指定的属性,最终和SimpleRabbitListenerContainerFactory组成AmqpListenerEndpointDescriptor对象放入endpointDescriptors集合中

3.4.3 Bean创建完成后的初始化方法

由于RabbitListenerAnnotationBeanPostProcessor实现了SmartInitializingSingleton接口,因此会回调afterSingletonsInstantiated()方法,在回调方法中会遍历3.4.2endpointDescriptors集合进行SimpleMessageListenerContainer注册,然后使用SimpleRabbitListenerContainerFactoryMethodRabbitListenerEndpoint进行属性配置,最终把创建的SimpleMessageListenerContainer放入RabbitListenerEndpointRegistrylistenerContainers容器中

3.5 应用生命周期

RabbitListenerEndpointRegistry实现了SmartLifecycle接口,在应用启动完成之后会回调start()方法

3.6 小结

  • RabbitListenerAnnotationBeanPostProcessor后置处理方法会处理含有@RabbitListener注解的方法,创建MethodRabbitListenerEndpoint对象与SimpleRabbitListenerContainerFactory组成一对放入集合中
  • RabbitListenerAnnotationBeanPostProcessor初始化方法会遍历上一步的集合根据SimpleRabbitListenerContainerFactory创建SimpleMessageListenerContainer对象,然后使用SimpleRabbitListenerContainerFactoryMethodRabbitListenerEndpointSimpleMessageListenerContainer进行配置,最终将SimpleMessageListenerContainer放入map容器中
  • RabbitListenerEndpointRegistry启动方法会遍历上一步的map容器,启动SimpleMessageListenerContainer

4. 启动监听容器

4.1 启动
代码语言:javascript复制
protected void doStart() {
    synchronized (this.consumersMonitor) {
        // 1.根据并发数来初始化消费者
        int newConsumers = initializeConsumers();
        Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
        for (BlockingQueueConsumer consumer : this.consumers) {
            // 2.每个消费者创建一个AsyncMessageProcessingConsumer对象
            AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            processors.add(processor);
            // 3.使用一个线程去执行
            getTaskExecutor().execute(processor);
            if (getApplicationEventPublisher() != null) {
                getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
            }
        }
        waitForConsumersToStart(processors);
    }
}
4.2 执行任务
代码语言:javascript复制
@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
    try {
        // 1.启动消费者
        initialize();
        // 2.消费消息
        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
            mainLoop();
        }
    }
}
4.2 启动消费者

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run--->org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#initialize--->org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#start--->org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#setQosAndreateConsumers--->org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#consumeFromQueue

代码语言:javascript复制
private void consumeFromQueue(String queue) throws IOException {
    // 1.设置消息回调处理
    InternalConsumer consumer = new InternalConsumer(this.channel, queue);
    // 2.告诉消息服务器当前channel消费的队列名称,消息ack模式以及当消息服务器推送消息后要执行的回调consumer
    String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
                                                   (this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
                                                   this.exclusive, this.consumerArgs,
                                                   consumer);

    if (consumerTag != null) {
        this.consumers.put(queue, consumer);
        if (logger.isDebugEnabled()) {
            logger.debug("Started on queue '"   queue   "' with tag "   consumerTag   ": "   this);
        }
    }
    else {
        logger.error("Null consumer tag received for queue "   queue);
    }
}
4.3 消息回调处理

消息回调处理由InternalConsumer负责

代码语言:javascript复制
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                           byte[] body) {
    try {
        if (BlockingQueueConsumer.this.abortStarted > 0) {
            // 1.当消息服务器将消息派发给消费者时由回调将消息放入到队列中
            if (!BlockingQueueConsumer.this.queue.offer(
                new Delivery(consumerTag, envelope, properties, body, this.queueName),
                BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
            }
        }
        else {
            BlockingQueueConsumer.this.queue
                .put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
        }
    }
}
4.4 消费消息

回到4.2部分的代码,进入mainLoop()查看消费消息的逻辑

代码语言:javascript复制
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR

    Channel channel = consumer.getChannel();

    List<Message> messages = null;
    long deliveryTag = 0;

    for (int i = 0; i < this.batchSize; i  ) {

        logger.trace("Waiting for message from consumer.");
        // 1.从队列中取消息
        Message message = consumer.nextMessage(this.receiveTimeout);
        if (this.consumerBatchEnabled) {
        }
        else {
            try {
                // 2.调用监听器方法进行执行业务逻辑
                executeListener(channel, message);
            }
        }
    }
    if (this.consumerBatchEnabled && messages != null) {
        executeWithList(channel, messages, deliveryTag, consumer);
    }
 // 3.向消息服务器发送ack
    return consumer.commitIfNecessary(isChannelLocallyTransacted());

}
4.5 小结
  • 启动SimpleMessageListenerContainer,根据并发数创建消费者
  • 告诉消息服务器要消费的队列、ack模式,指定处理消息的回调
  • 消息服务器推送消息给消费者,执行回调,回调将消息放入队列中
  • 消费者线程无限循环从队列中取消息,消费消息执行业务逻辑
  • 执行完业务逻辑后向消息服务器发送ack
  • 之前,给大家发过三份Java面试宝典,这次新增了一份,目前总共是四份面试宝典,相信在跳槽前一个月按照面试宝典准备准备,基本没大问题。《java面试宝典5.0》(初中级)《350道Java面试题:整理自100 公司》(中高级)《资深java面试宝典-视频版》(资深)《Java[BAT]面试必备》(资深)分别适用于初中级,中高级,资深级工程师的面试复习。内容包含java基础、javaweb、mysql性能优化、JVM、锁、百万并发、消息队列,高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级等等。获取方式:点“在看”,V信关注上述Java最全面试题库号并回复 【面试】即可领取,更多精彩陆续奉上。 看到这里,证明有所收获必须点个在看支持呀,喵

0 人点赞