2021-03-02 16:27:33
浏览数 (1)
- org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration
代码语言:javascript
复制// 根据注解查找所有bean
@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerContainer);
}
// 根据Listener注册ListenerContainer
private void registerContainer(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
...
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
// 获取消费者组、主题
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
String topic = this.environment.resolvePlaceholders(annotation.topic());
// 判断应用配置中对应组和主题是否激活
boolean listenerEnabled =
(boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
consumerGroup, topic);
return;
}
// 校验消息模式和消费模式是否冲突(广播模式不能顺序消费)
validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
// 创建注册RocketMQListenerContainerBean
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
// 启动ListenerContainer
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
// 创建ListenerContainer
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
String tags = environment.resolvePlaceholders(annotation.selectorExpression());
if (!StringUtils.isEmpty(tags)) {
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQListener((RocketMQListener) bean);
} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQReplyListener((RocketMQReplyListener) bean);
}
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name);
return container;
}