如何基于Spring Event事件拓展成“全网事件”?

2022-04-13 16:52:46 浏览数 (1)

导读:今天我们聊一个技术改造,针对Spring的ApplicationEvent事件做下优化。总结本篇文章希望对从事相关工作的同学能够有所帮助或者启发 。

前言

作为合格的Java开发,对Spring中ApplicationEvent事件都非常了解。在同一JVM里面我们可以通过发布对应事件,然后通过监听事件的方式实现单模块代码或逻辑上的优化调整。

比如我们在系统启过程中会监听系统启动监听器虚拟类用来完成 SpringApplicationRunListener监听。

  • 系统内置banner处理
  • 系统内置默认配置处理
  • logbak统一标准的配置

在单体应用中(同一JVM中)通过监听事件的方式实现拓展无疑不错的选择。但是在微服务中如果我们需要发布针对整个系统而言,发布全局事件。在各个微服务集群节点都能接受到呢?

本篇主要分享改造Spring事件的过程分享。结本篇文章希望对从事相关工作的同学能够有所帮助或者启发 。

背景

目前市面上比较火的低代码平台,目前核心就是表单引擎。一个表单可能对应对应一个数据源或多个。在引擎中不可能管理数据源,只能通过各应用模块来拓展实现。

在表单引擎中声明拓展数据源,应用服务独立拓展实现。

实现数据源配置采集来创建可用数据源,首先面对的是两个事情。

  • 应用服务都独立启动(先后顺序),保证数据源配置能实时同步至表单引擎构建源缓存
  • 应用服务和引擎服务启动后,拓展源有变更及时通知引擎层更新数据源

技术选型

采用Redis实现消息队列-发布/订阅模式

由表单引擎提供的Support 集成到业务模块,业务模块实现表单引擎拓展的数据源来上报数据源配置信息(当然为了安全可以对数据库用户名密码进行加密),声明拓展服务必须实现。

@FormDataSourceExtension 数据源拓展

  • 是否支持外部数据源拓展
代码语言:javascript复制
boolean isSupport();
  • 根据数据源标识获取数据源配置
代码语言:javascript复制
FormDataSourceConfig getDataSourceBySourceId(@RequestBody IdVO idVO);
  • 获取模块拓展数据源
代码语言:javascript复制
List<DataSourceConfig> getDataSourceConfigs()

整个大致逻辑如下

基于MQ管理消息队列-发布/订阅模式

原理和上诉原理相似,基础这个选择。在于同事反馈SpringBoot 集成Redis 在某些版本的时候,应用服务集群部署时候,某些节点无法订阅到消息问题,出现了消息丢失。

如果出现这种情况,对应缓存数据源在某些节点是无法创建的,需要每次链接的时候重新创建。这样一来性能问题就上来了所以采用了基于MQ管理消息队列,应用服务发布数据源变更,引擎层监听。

实现方案在使用组件上做下调整即可

方案优化

如果没有那么多的中间件可以使用,采用原生的方式实现。

  • 原生我们该如何实现呢?
  • 或者对于中间件的使用在对于业务模块而言是不是不用那么关注呢?

第一:如果采用原生方案实现,在微服务中我们只能借助注册中心采集到的微服务节点的元数据来一一分发业务。通过应用ID找到所有存活节点来实现远程调用通知。

第二:结合前言中提到的Spring事件,仔细思考下。可否通过申明系统全局的事件,通知到监听的应用节点呢?

当然有,比如在配置中心我们统一修改配置如何分发到所有应用节点配置刷新触发?

发布全局事件
代码语言:javascript复制
/**
 * 发布全网事件,事件必须构造函数ApplicationEvent(String)
 */
public void publishGlobalEvent(ApplicationEvent event) {
        String text = StringHelper.join(event.getClass().getName(), ":",event.getSource());
         // 这里可调整为MQ方式实现
        redisson.getTopic(TOPIC_GLOBAL_EVENT).publish(text);
        log.debug("send:{}", text);
}
▐ 监听全局事件
代码语言:javascript复制
/**
 * 应用配置刷新触发
 */
@Slf4j
@Component
public class ApplicationEnvironmentService
        implements InitializingBean {
    @Autowired
    private RedissonClient redisson;
    @Autowired
    private ContextRefresher contextRefresher;
    @Autowired
    private ApplicationContext applicationContext;

    private static final String KEY_REFRESHENVIRONMENT = "refreshEnvironment";
    private static final String TOPIC_GLOBAL_EVENT = "core:TOPIC_GLOBAL_EVENT";

    @Override
    public void afterPropertiesSet() throws Exception {
        redisson.getTopic(TOPIC_GLOBAL_EVENT).addListener(String.class,
                this::onMessage);
    }

    private void onMessage(CharSequence channel, String message) {
        log.debug("receive:{}", message);
        int index = message.indexOf(':');
        String className = message.substring(0, index);
        String arg = message.substring(index   1);
        if (RefreshScopeRefreshedEvent.class.getName().equals(className)) {
            NamedThreadFactory.async(() -> {
                if (KEY_REFRESHENVIRONMENT.equals(arg)) {
                    doRefreshEnvironment();
                } else {
                    doRefreshAll();
                }
            });
            return;
        }
        Class<?> clazz = ReflectUtil.classForName(className);
        if (clazz == null) {
            return;
        }
        try {
            Constructor<?> constructor = clazz.getConstructor(String.class);
            ApplicationEvent event = (ApplicationEvent) constructor
                    .newInstance(arg);
            applicationContext.publishEvent(event);
        } catch (Exception e) {
            log.warn("事件反序列化失败", e);
        }
    }
}

总结

对技术探索都是循序渐进,对于对外服务提供基础能力。尽可能封装底层逻辑,方便后续升级和改造优化。本文章提到了基础微服务的全局拓展事件,实现原理可能会比直接采用原生的复杂、代码量也会多很多。

但是无意之中提出了“中间件的能力”便于服务与其他应用场景,总结本篇文章希望对从事相关工作的同学能够有所帮助或者启发

- END -

0 人点赞