聊聊KafkaListener的实现机制

2023-10-22 23:56:34 浏览数 (1)

本文只要研究一下KafkaListener的实现机制

KafkaListener

org/springframework/kafka/annotation/KafkaListener.java

代码语言:javascript复制
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
	String id() default "";
	String containerFactory() default "";
	String[] topics() default {};
	String topicPattern() default "";
	TopicPartition[] topicPartitions() default {};
	String containerGroup() default "";
	String errorHandler() default "";
	String groupId() default "";
	boolean idIsGroup() default true;
	String clientIdPrefix() default "";
	String beanRef() default "__listener";
	String concurrency() default "";
	String autoStartup() default "";
	String[] properties() default {};
}

KafkaListener注解定义了id、topics、groupId等属性

KafkaListenerAnnotationBeanPostProcessor

org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

代码语言:javascript复制
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
		implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {

	private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();	

	@Override
	public int getOrder() {
		return LOWEST_PRECEDENCE;
	}

	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		this.beanFactory = beanFactory;
		if (beanFactory instanceof ConfigurableListableBeanFactory) {
			this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
			this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
					this.listenerScope);
		}
	}

	@Override
	public void afterSingletonsInstantiated() {
		this.registrar.setBeanFactory(this.beanFactory);

		if (this.beanFactory instanceof ListableBeanFactory) {
			Map<String, KafkaListenerConfigurer> instances =
					((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
			for (KafkaListenerConfigurer configurer : instances.values()) {
				configurer.configureKafkaListeners(this.registrar);
			}
		}

		if (this.registrar.getEndpointRegistry() == null) {
			if (this.endpointRegistry == null) {
				Assert.state(this.beanFactory != null,
						"BeanFactory must be set to find endpoint registry by bean name");
				this.endpointRegistry = this.beanFactory.getBean(
						KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
						KafkaListenerEndpointRegistry.class);
			}
			this.registrar.setEndpointRegistry(this.endpointRegistry);
		}

		if (this.defaultContainerFactoryBeanName != null) {
			this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
		}

		// Set the custom handler method factory once resolved by the configurer
		MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
		if (handlerMethodFactory != null) {
			this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
		}
		else {
			addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
		}

		// Actually register all listeners
		this.registrar.afterPropertiesSet();
	}

	@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
			Class<?> targetClass = AopUtils.getTargetClass(bean);
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			final List<Method> multiMethods = new ArrayList<>();
			Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
						Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
						return (!listenerMethods.isEmpty() ? listenerMethods : null);
					});
			if (hasClassLevelListeners) {
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						(ReflectionUtils.MethodFilter) method ->
								AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
				multiMethods.addAll(methodsWithHandler);
			}
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(bean.getClass());
				if (this.logger.isTraceEnabled()) {
					this.logger.trace("No @KafkaListener annotations found on bean type: "   bean.getClass());
				}
			}
			else {
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
					Method method = entry.getKey();
					for (KafkaListener listener : entry.getValue()) {
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				if (this.logger.isDebugEnabled()) {
					this.logger.debug(annotatedMethods.size()   " @KafkaListener methods processed on bean '"
							  beanName   "': "   annotatedMethods);
				}
			}
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}		
}		

KafkaListenerAnnotationBeanPostProcessor实现了BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton接口,其getOrder返回LOWEST_PRECEDENCE 其afterSingletonsInstantiated方法(SmartInitializingSingleton接口)首先获取KafkaListenerConfigurer,然后设置configureKafkaListeners为registrar,最后是执行registrar.afterPropertiesSet() 其postProcessAfterInitialization方法(BeanPostProcessor接口)会收集标注KafkaListener的bean的方法,然后针对每个方法执行processKafkaListener

processKafkaListener

代码语言:javascript复制
	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	}

	protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
			Object bean, Object adminTarget, String beanName) {

		String beanRef = kafkaListener.beanRef();
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.addListener(beanRef, bean);
		}
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
		endpoint.setTopics(resolveTopics(kafkaListener));
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
		String group = kafkaListener.containerGroup();
		if (StringUtils.hasText(group)) {
			Object resolvedGroup = resolveExpression(group);
			if (resolvedGroup instanceof String) {
				endpoint.setGroup((String) resolvedGroup);
			}
		}
		String concurrency = kafkaListener.concurrency();
		if (StringUtils.hasText(concurrency)) {
			endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
		}
		String autoStartup = kafkaListener.autoStartup();
		if (StringUtils.hasText(autoStartup)) {
			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
		}
		resolveKafkaProperties(endpoint, kafkaListener.properties());

		KafkaListenerContainerFactory<?> factory = null;
		String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
		if (StringUtils.hasText(containerFactoryBeanName)) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
			try {
				factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
			}
			catch (NoSuchBeanDefinitionException ex) {
				throw new BeanInitializationException("Could not register Kafka listener endpoint on ["   adminTarget
						  "] for bean "   beanName   ", no "   KafkaListenerContainerFactory.class.getSimpleName()
						  " with id '"   containerFactoryBeanName   "' was found in the application context", ex);
			}
		}

		endpoint.setBeanFactory(this.beanFactory);
		String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
		if (StringUtils.hasText(errorHandlerBeanName)) {
			endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
		}
		this.registrar.registerEndpoint(endpoint, factory);
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.removeListener(beanRef);
		}
	}	

processKafkaListener方法将method转换为MethodKafkaListenerEndpoint,然后执行processListener方法,它主要是将KafkaListener注解的信息填充到MethodKafkaListenerEndpoint上,确定KafkaListenerContainerFactory,最后执行registrar.registerEndpoint(endpoint, factory)

registrar.registerEndpoint

org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

代码语言:javascript复制
	/**
	 * Register a new {@link KafkaListenerEndpoint} alongside the
	 * {@link KafkaListenerContainerFactory} to use to create the underlying container.
	 * <p>The {@code factory} may be {@code null} if the default factory has to be
	 * used for that endpoint.
	 * @param endpoint the {@link KafkaListenerEndpoint} instance to register.
	 * @param factory the {@link KafkaListenerContainerFactory} to use.
	 */
	public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
		Assert.notNull(endpoint, "Endpoint must be set");
		Assert.hasText(endpoint.getId(), "Endpoint id must be set");
		// Factory may be null, we defer the resolution right before actually creating the container
		KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
		synchronized (this.endpointDescriptors) {
			if (this.startImmediately) { // Register and start immediately
				this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
						resolveContainerFactory(descriptor), true);
			}
			else {
				this.endpointDescriptors.add(descriptor);
			}
		}
	}

KafkaListenerEndpointRegistrar的registerEndpoint会创建KafkaListenerEndpointDescriptor,然后执行endpointRegistry.registerListenerContainer

endpointRegistry.registerListenerContainer

org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

代码语言:javascript复制
	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
			boolean startImmediately) {
		Assert.notNull(endpoint, "Endpoint must not be null");
		Assert.notNull(factory, "Factory must not be null");

		String id = endpoint.getId();
		Assert.hasText(id, "Endpoint id must not be empty");
		synchronized (this.listenerContainers) {
			Assert.state(!this.listenerContainers.containsKey(id),
					"Another endpoint is already registered with id '"   id   "'");
			MessageListenerContainer container = createListenerContainer(endpoint, factory);
			this.listenerContainers.put(id, container);
			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
				List<MessageListenerContainer> containerGroup;
				if (this.applicationContext.containsBean(endpoint.getGroup())) {
					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
				}
				else {
					containerGroup = new ArrayList<MessageListenerContainer>();
					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
				}
				containerGroup.add(container);
			}
			if (startImmediately) {
				startIfNecessary(container);
			}
		}
	}

	/**
	 * Start the specified {@link MessageListenerContainer} if it should be started
	 * on startup.
	 * @param listenerContainer the listener container to start.
	 * @see MessageListenerContainer#isAutoStartup()
	 */
	private void startIfNecessary(MessageListenerContainer listenerContainer) {
		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
			listenerContainer.start();
		}
	}	

KafkaListenerEndpointRegistry的registerListenerContainer方法会根据endpoint和factory来创建MessageListenerContainer,然后放入到listenerContainers中,对于startImmediately的会执行startIfNecessary,它主要是执行listenerContainer.start()

MessageListenerContainer

org/springframework/kafka/listener/MessageListenerContainer.java

代码语言:javascript复制
public interface MessageListenerContainer extends SmartLifecycle {
	void setupMessageListener(Object messageListener);
	Map<String, Map<MetricName, ? extends Metric>> metrics();
	default ContainerProperties getContainerProperties() {
		throw new UnsupportedOperationException("This container doesn't support retrieving its properties");
	}
	default Collection<TopicPartition> getAssignedPartitions() {
		throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
	}
	default void pause() {
		throw new UnsupportedOperationException("This container doesn't support pause");
	}
	default void resume() {
		throw new UnsupportedOperationException("This container doesn't support resume");
	}
	default boolean isPauseRequested() {
		throw new UnsupportedOperationException("This container doesn't support pause/resume");
	}
	default boolean isContainerPaused() {
		throw new UnsupportedOperationException("This container doesn't support pause/resume");
	}
	default void setAutoStartup(boolean autoStartup) {
		// empty
	}
	default String getGroupId() {
		throw new UnsupportedOperationException("This container does not support retrieving the group id");
	}
	@Nullable
	default String getListenerId() {
		throw new UnsupportedOperationException("This container does not support retrieving the listener id");
	}
}

MessageListenerContainer继承了SmartLifecycle接口,它有一个泛型接口为GenericMessageListenerContainer,后者有一个抽象类为AbstractMessageListenerContainer,然后它有两个子类,分别是KafkaMessageListenerContainer与ConcurrentMessageListenerContainer

AbstractMessageListenerContainer

代码语言:javascript复制
public abstract class AbstractMessageListenerContainer<K, V>
		implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware {

	@Override
	public final void start() {
		checkGroupId();
		synchronized (this.lifecycleMonitor) {
			if (!isRunning()) {
				Assert.isTrue(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
						() -> "A "   GenericMessageListener.class.getName()   " implementation must be provided");
				doStart();
			}
		}
	}

	@Override
	public final void stop() {
		synchronized (this.lifecycleMonitor) {
			if (isRunning()) {
				final CountDownLatch latch = new CountDownLatch(1);
				doStop(latch::countDown);
				try {
					latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR
					publishContainerStoppedEvent();
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				}
			}
		}
	}
	//......
}		

AbstractMessageListenerContainer的start方法会回调子类的doStart方法,其stop方法会回调子类的doStop方法

KafkaMessageListenerContainer

org/springframework/kafka/listener/KafkaMessageListenerContainer.java

代码语言:javascript复制
public class KafkaMessageListenerContainer<K, V> // NOSONAR comment density
		extends AbstractMessageListenerContainer<K, V> {

	@Override
	protected void doStart() {
		if (isRunning()) {
			return;
		}
		if (this.clientIdSuffix == null) { // stand-alone container
			checkTopics();
		}
		ContainerProperties containerProperties = getContainerProperties();
		checkAckMode(containerProperties);

		Object messageListener = containerProperties.getMessageListener();
		Assert.state(messageListener != null, "A MessageListener is required");
		if (containerProperties.getConsumerTaskExecutor() == null) {
			SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
					(getBeanName() == null ? "" : getBeanName())   "-C-");
			containerProperties.setConsumerTaskExecutor(consumerExecutor);
		}
		Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
		GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
		ListenerType listenerType = deteremineListenerType(listener);
		this.listenerConsumer = new ListenerConsumer(listener, listenerType);
		setRunning(true);
		this.listenerConsumerFuture = containerProperties
				.getConsumerTaskExecutor()
				.submitListenable(this.listenerConsumer);
	}

	//......
}		

KafkaMessageListenerContainer的doStart方法会获取到messageListener,然后创建ListenerConsumer,最后提交到线程池中执行

ConcurrentMessageListenerContainer

org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

代码语言:javascript复制
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {

	@Override
	protected void doStart() {
		if (!isRunning()) {
			checkTopics();
			ContainerProperties containerProperties = getContainerProperties();
			TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
			if (topicPartitions != null && this.concurrency > topicPartitions.length) {
				this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
						  "equal to the number of partitions; reduced from "   this.concurrency   " to "
						  topicPartitions.length);
				this.concurrency = topicPartitions.length;
			}
			setRunning(true);

			for (int i = 0; i < this.concurrency; i  ) {
				KafkaMessageListenerContainer<K, V> container;
				if (topicPartitions == null) {
					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties);
				}
				else {
					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
							containerProperties, partitionSubset(containerProperties, i));
				}
				String beanName = getBeanName();
				container.setBeanName((beanName != null ? beanName : "consumer")   "-"   i);
				if (getApplicationEventPublisher() != null) {
					container.setApplicationEventPublisher(getApplicationEventPublisher());
				}
				container.setClientIdSuffix("-"   i);
				container.setGenericErrorHandler(getGenericErrorHandler());
				container.setAfterRollbackProcessor(getAfterRollbackProcessor());
				container.setRecordInterceptor(getRecordInterceptor());
				container.setEmergencyStop(() -> {
					stop(() -> {
						// NOSONAR
					});
					publishContainerStoppedEvent();
				});
				if (isPaused()) {
					container.pause();
				}
				container.start();
				this.containers.add(container);
			}
		}
	}

	//......	
}

ConcurrentMessageListenerContainer的doStart会根据concurrency值来创建对应的KafkaMessageListenerContainer,然后执行其start方法

ListenerConsumer

org/springframework/kafka/listener/KafkaMessageListenerContainer.java

代码语言:javascript复制
private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {

		@Override
		public void run() {
			this.consumerThread = Thread.currentThread();
			if (this.genericListener instanceof ConsumerSeekAware) {
				((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
			}
			if (this.transactionManager != null) {
				ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
			}
			this.count = 0;
			this.last = System.currentTimeMillis();
			initAsignedPartitions();
			while (isRunning()) {
				try {
					pollAndInvoke();
				}
				catch (@SuppressWarnings(UNUSED) WakeupException e) {
					// Ignore, we're stopping or applying immediate foreign acks
				}
				catch (NoOffsetForPartitionException nofpe) {
					this.fatalError = true;
					ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
					break;
				}
				catch (Exception e) {
					handleConsumerException(e);
				}
				catch (Error e) { // NOSONAR - rethrown
					Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
					if (runnable != null) {
						runnable.run();
					}
					this.logger.error("Stopping container due to an Error", e);
					wrapUp();
					throw e;
				}
			}
			wrapUp();
		}

		protected void pollAndInvoke() {
			if (!this.autoCommit && !this.isRecordAck) {
				processCommits();
			}
			processSeeks();
			checkPaused();
			ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
			this.lastPoll = System.currentTimeMillis();
			checkResumed();
			debugRecords(records);
			if (records != null && records.count() > 0) {
				if (this.containerProperties.getIdleEventInterval() != null) {
					this.lastReceive = System.currentTimeMillis();
				}
				invokeListener(records);
			}
			else {
				checkIdle();
			}
		}

		private void invokeListener(final ConsumerRecords<K, V> records) {
			if (this.isBatchListener) {
				invokeBatchListener(records);
			}
			else {
				invokeRecordListener(records);
			}
		}

		private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
				List<ConsumerRecord<K, V>> recordList) {
			switch (this.listenerType) {
				case ACKNOWLEDGING_CONSUMER_AWARE:
					this.batchListener.onMessage(recordList,
							this.isAnyManualAck
									? new ConsumerBatchAcknowledgment(records)
									: null, this.consumer);
					break;
				case ACKNOWLEDGING:
					this.batchListener.onMessage(recordList,
							this.isAnyManualAck
									? new ConsumerBatchAcknowledgment(records)
									: null);
					break;
				case CONSUMER_AWARE:
					this.batchListener.onMessage(recordList, this.consumer);
					break;
				case SIMPLE:
					this.batchListener.onMessage(recordList);
					break;
			}
		}

		private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
			ConsumerRecord<K, V> record = recordArg;
			if (this.recordInterceptor != null) {
				record = this.recordInterceptor.intercept(record);
			}
			if (record == null) {
				if (this.logger.isDebugEnabled()) {
					this.logger.debug("RecordInterceptor returned null, skipping: "   recordArg);
				}
			}
			else {
				switch (this.listenerType) {
					case ACKNOWLEDGING_CONSUMER_AWARE:
						this.listener.onMessage(record,
								this.isAnyManualAck
										? new ConsumerAcknowledgment(record)
										: null, this.consumer);
						break;
					case CONSUMER_AWARE:
						this.listener.onMessage(record, this.consumer);
						break;
					case ACKNOWLEDGING:
						this.listener.onMessage(record,
								this.isAnyManualAck
										? new ConsumerAcknowledgment(record)
										: null);
						break;
					case SIMPLE:
						this.listener.onMessage(record);
						break;
				}
			}
		}									
	//......	
}

ListenerConsumer实现了org.springframework.scheduling.SchedulingAwareRunnable接口(它继承了Runnable接口)以及org.springframework.kafka.listener.ConsumerSeekCallback接口 其run方法主要是执行initAsignedPartitions,然后循环执行pollAndInvoke,对于NoOffsetForPartitionException则跳出异常,对于其他Exception则执行handleConsumerException,对于Error执行emergencyStop与wrapUp方法 pollAndInvoke方法主要是执行consumer.poll(),然后通过invokeListener(records)回调,最后是通过doInvokeBatchOnMessage、doInvokeOnMessage去回调listener.onMessage方法

小结

KafkaListenerAnnotationBeanPostProcessor主要是收集标注KafkaListener的bean的方法,然后针对每个方法执行processKafkaListener,processKafkaListener方法将method转换为MethodKafkaListenerEndpoint,执行registrar.registerEndpoint(endpoint, factory) KafkaListenerEndpointRegistry的registerListenerContainer方法会根据endpoint和factory来创建MessageListenerContainer,然后放入到listenerContainers中,对于startImmediately的会执行startIfNecessary,它主要是执行listenerContainer.start() MessageListenerContainer有两个主要的实现类分别是KafkaMessageListenerContainer与ConcurrentMessageListenerContainer,后者的start方法主要是根据concurrency创建对应数量的KafkaMessageListenerContainer,最后都是执行KafkaMessageListenerContainer的start方法,它会创建ListenerConsumer,最后提交到线程池中执行;ListenerConsumer主要是执行pollAndInvoke,拉取消息,然后回到listener的onMessage方法 整体的链路就是KafkaListenerAnnotationBeanPostProcessor --> KafkaListenerEndpointRegistry --> MessageListenerContainer --> GenericMessageListener.onMessage

0 人点赞