Asynchronous Servlet 在 Nacos 1.X 动态配置管理中的应用

2022-12-01 21:44:24 浏览数 (1)

Nacos/nɑ:kəʊs/脱胎于阿里巴巴内部的 Config Server、VIPServer 和 Diamond,成长于多年双十一的洪峰考验,沉淀了简单易用、稳定可靠、性能卓越的核心竞争力。于 2018 年正式开源,其核心特性有:服务发现动态配置管理动态 DNS 服务

笔者所在的云网络控制台团队使用 Nacos 来做配置管理,那么以 Nacos 为代表的配置中心究竟解决了哪些痛点问题呢?配置,作为代码如影随形的小伙伴,伴随着应用的整个生命周期,一般有三种形式:1) 硬编码,配置项通过类字段来承载;可以暴露 API 实现动态变更,但配置变更是发生在堆内存中的,没有持久化,一旦应用重启,配置项会回退到代码中的默认值;此外,如果有多个应用实例,自然需要逐一调用 API 变更配置,运维成本可想而知,极其蛋疼。2) 配置文件,配置项通过配置文件来承载,如 Spring 中的 properies、yaml 文件;配置文件解决了持久化的问题,但配置变更需要登录机器手动维护外部配置文件,然后重启应用,运维成本依然很高。3) 配置表,配置项保存在数据库中的配置表内;这种形式将配置从应用中抽离出来进行集中管理,可以有效地降低运维成本;但需要额外的定时任务拉取变更后的配置项,不够优雅。这么一合计,关于配置的痛点问题也就呼之欲出了,分别是:动态变更、持久化和运维成本

笔者在 2017 年曾参与集团委派的 Prometheus 调研项目,在一次赴京汇报中,一位博士大佬问我:Prometheus Server 与一众 Exporter 是如何进行数据交互的?我回答道:Exporter 会不断拉取 Prometheus Server 中的 metrics;接着大佬追问:为什么选用拉模式,拉模式与推模式有何区别呢?当时回答地比较片面。的确,数据交互有两种模式:Push (推模式) 和 Pull (拉模式)。推模式指的是客户端与服务端建立长链接后,服务端向客户端推送数据;这种方式实时性高,但如果客户端的数据消费能力较弱,而且没有相应的反压 (Back Pressure) 机制,那十有八九会导致客户端数据积压。拉模式指的是客户端主动向服务端发起数据拉取的请求;其优点是客户端将拉取的节奏掌握自己手中,不会导致数据积压,但实时性会比较差。

无论是在 Nacos 1.X 亦或是 2.X 中,Config Server 与 Config Client 针对动态配置项的交互模型均是基于Pull模式的。但在 1.X 版本中,Config Client 并没有与 Config Server 建立所谓的长链接 (Long Connection),而是通过 长轮询 (Long Polling) 来模拟长链接。接下来,让我们一起来简单地学习下长轮询在 Nacos 动态配置管理中的落地思路。

1 长轮询

在长轮询中,当服务端收到客户端的请求后,服务端会一直挂起链接 (Connection),直到服务端有可以响应给客户端的数据,才会关闭链接,然后客户端再次发起请求,周而复始 ··· 废话不多说,上车,噢不,上图!

2 Asynchronous Servlet

在 Servlet 3.0 之前,Servlet 完全遵循同步阻塞I/O模型,这意味着一个 HTTP 请求对应一个 Servlet Container 线程,即每一个 HTTP 请求都是在各自线程上下文中完成处理的,这也正是ThreadLocal的用武之地。Servlet 3.0 引入了一种异步处理请求的能力,即Asynchronous Servlet。为什么要有 Asynchronous Servlet 呢?Tomcat 线程池默认最多有 200 个工作线程,由于每一个线程都会消耗服务器的硬件资源,也就是说线程数量肯定是有上限的,一旦业务逻辑又臭又长,那么在并发请求较多的场景,必然会出现线程池中已经没有多余的线程来处理 HTTP 请求的情形,因为线程都在忙,没有办法返回线程池中,这就是经典的线程饥饿问题。怎么办呢?Asynchronous Servlet 的核心思想是将 Servlet Container 中的工作线程与复杂耗时的业务逻辑相解耦,试图通过另一个线程池来承载这些耗时逻辑,使得工作线程可以快速地返回到 Servlet Container 线程池中,以处理新的 HTTP 请求。尽管工作线程已经返回到 Servlet Container 线程池中,但 Servlet Container 并不会断开与客户端的链接,否则怎么给客户端响应数据啊,它会一直挂起客户端链接,最终由 Servlet Container 之外的业务线程池中的线程来响应客户端。

Asynchronous Servlet 用起来很简单,首先通过 HttpServletRequest 的startAsync()方法初始化AsyncContext实例,同时标志着当前 Servlet Container 中的工作线程会尽早返回到线程池中;然后将 AsyncContext 实例传递到业务线程池中的线程;最后,当复杂的计算逻辑完成后,业务线程池中的线程调用 AsyncContext 的complete()方法实现对客户端的响应。

虽然 Servlet 3.0 对请求的处理是异步的,但是面向 ServletInputStreamServletOutputStream的 I/O 操作却依然是阻塞的,想象一下,如果请求体的体量很大,Servlet Container 中的工作线程必然面临不必要的等待,这是另一个影响工作线程尽早返回到 Servlet Container 线程池的因素。为了解决这个问题,Servlet 3.1 又引入了Non-blocking I/O,主要是向 ServletInputStream 和 ServletOutputStream 中注册ReadListenerWriteListener回调接口。

笔者总感觉 Asynchronous Servlet 思路有点怪怪的,难道重新开了一个业务线程就不会造成业务线程的饥饿问题吗?此外,Asynchronous Servlet 与基于长轮询的客户端 Pull 方案简直绝配啊,有没有?

3 Nacos 配置管理之长轮询源码解读

Nacos 2.X 针对动态配置的交互方案进行了升级,通过 GRPC 来真正地实现长链接,进一步压榨通信效率。因此,笔者这里只能基于 Nacos 1.X 进行源码解读。首先,查看一下标签信息,然后基于 1.4.3 标签创建一同名本地分支。

代码语言:javascript复制
git tag --list
git checkout -b 1.4.3 1.4.3 (git branch 1.4.3 1.4.3)

3.1 跑起来

理解开源项目源码最快的方式无疑是跑起来,那么如何在 Intellij Idea 中单独将 nacos-config 模块跑起来呢?

3.1.1 配置启动参数
代码语言:javascript复制
-Dnacos.standalone=true -Dnacos.functionMode=config
3.1.2 改造 AuthConfig

AuthConfig 位于 nacos-core 模块中,将其头顶上的@Configuration注解拿掉即可。

3.1.3 移除 Spring Security 依赖

nacos-config 模块依赖于 nacos-core 模块,而 nacos-core 模块引入了 Spring Security,去掉它可以方便咱们调试。

代码语言:javascript复制
<dependency>
    <groupId>${project.groupId}</groupId>
    <artifactId>nacos-core</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </exclusion>
    </exclusions>
</dependency>
3.1.4 追加 application.properties 配置文件

在 nacos-config 模块中追加 application.properties 配置文件,内容如下:

代码语言:javascript复制
server.servlet.contextPath=/nacos
server.port=8848

启动吧~

3.2 Config Client 如何发起长轮询

如果希望 Config Client 动态拉取到 Config Server 中的变更配置项,则需要向 Config Client 注册监听器 (Listener) 以用于回调。示例如下:

代码语言:javascript复制
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
properties.put("namespace", tenant);
ConfigService configService = NacosFactory.createConfigService(properties);
configService.addListener(dataId, group, new AbstractListener() {
    @Override
    public void receiveConfigInfo(String configInfo) {}
});

ConfigService是 nacos-client 模块对外暴漏配置管理能力的顶层抽象,针对配置与监听器的增删改查逻辑都在这里了。NacosConfigService是其唯一的实现类,对 Nacos Client 角色而言,只需全局构建一个 NacosConfigService 实例即可。NacosConfigService 实例可以由NacosFactory构建,注意在通过 NacosFactory 构建 NacosConfigService 实例的过程中,Properties 入参中必须要有两个属性,分别是:serverAddrnamespace。其实相较于addListener()方法,NacosConfigService 实例构造逻辑不仅易被忽略,而且也更为有料。咱们先来看看 addListener() 方法中的内容吧:

代码语言:javascript复制
public class NacosConfigService implements ConfigService {
    private final ClientWorker worker;

    public void addListener(String dataId, String group, Listener listener) throws NacosException {
        worker.addTenantListeners(dataId, group, Arrays.asList(listener));
    }
}

public class ClientWorker {
    private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<String, CacheData>();

    public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
        group = blank2defaultGroup(group);
        String tenant = agent.getTenant();
        CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
        for (Listener listener : listeners) {
            cache.addListener(listener);
        }
    }

    public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
        String key = GroupKey.getKeyTenant(dataId, group, tenant);
        CacheData cacheData = cacheMap.get(key);
        if (cacheData != null) {
            return cacheData;
        }
        cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
        CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
        if (lastCacheData == null) {
            // ParamUtil.getPerTaskConfigSize() 值为 3000
            int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
            cacheData.setTaskId(taskId);
            lastCacheData = cacheData;
        }
        return lastCacheData;
    }
}

也许从CacheData入手,对理解上述内容会有一些帮助。CacheData 承载了对监听器的管理任务,CacheData 除了封装 dataId、group、namespace 和 md5 等信息外,还通过 CopyOnWriteArrayList来维护监听器列表,但无论一个 CacheData 实例中有多少监听器,这些监听器均拥有相同的 dataId group namespace 组合,因此在ClientWorker看来,一个 CacheData 实例才真正对应一个监听器;此外,监听器的回调逻辑也是在 CacheData 中完成的。ClientWorker 中持有一 ConcurrentHashMap<String, CacheData> 类型的成员变量 cacheMap,其 key 按照 dataId group namespace 这一规则拼接而成,也就是说 dataId、group 和 namespace 与 CacheData 实例是一一对应的;此外,一个 CacheData 实例要想被填充到 ClientWorker 中的 cacheMap 中,只能通过 ConfigService 的addListener()方法来实现。综上分析,这就是为什么 cacheMap.size() 可以表征监听器数量的原因。坦白来说,一个 Spring Boot 应用就应该对应一个 dataId group namespace,但万一哪个大聪明给一个 Spring Boot 应用搞了几百上千个 dataId group namespace 组合,那难道要为每一组 dataId group namespace 发起一个长轮询任务吗?那 executorService 线程池中的线程肯定不够用了,而且这对 Nacos Server 也是一个巨大的压力,显然不能这么搞。在上述源码中,ParamUtil.getPerTaskConfigSize() 的值为 3000,其实就是以 3000 组 dataId group namespace 为维度去发起一次长轮询任务,这很严谨!

从上述内容来看,addListener() 方法主要用来填充 ClientWorker 中的 cacheMap。具体地,如果 cacheMap 中存在 key 为 dataId group namespace 的 CacheData 实例,则直接填充该 CacheData 实例的监听器列表;否则构建一个 CacheData 实例,同时为该 CacheData 实例设定 taskId 属性值。

接下来,一起分析下 NacosConfigService 实例构建过程中的那些干货吧,如下:

代码语言:javascript复制
public class NacosFactory {
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        return ConfigFactory.createConfigService(properties);
    }
}

public class ConfigFactory {
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }
}

public class NacosConfigService implements ConfigService {
    private final HttpAgent agent;
    private final ClientWorker worker;
    private String namespace;

    public NacosConfigService(Properties properties) throws NacosException {
        ValidatorUtils.checkInitParam(properties);
        initNamespace(properties);
        this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        this.agent.start();
        this.worker = new ClientWorker(this.agent, properties);
    }
    private void initNamespace(Properties properties) {
        namespace = ParamUtil.parseNamespace(properties);
        properties.put(PropertyKeyConst.NAMESPACE, namespace);
    }
}

public class ClientWorker {
    private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<String, CacheData>();
    final ScheduledExecutorService executor;
    final ScheduledExecutorService executorService;
    private final HttpAgent agent;

    public ClientWorker(final HttpAgent agent, final Properties properties) {
        this.agent = agent;
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker."   agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling."   agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("["   agent.getName()   "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }
} 

不要被这么一大坨代码吓到,都是一些很直白的逻辑。从内容来看,ClientWorker 实例也会一并完成构建,它会创建两个ScheduledExecutorService类型的线程池,executor 线程池中只有一个核心线程,它主要定时执行checkConfigInfo()方法中的逻辑,延迟 1 毫秒后,第一次执行 checkConfigInfo() 方法,当该方法执行完毕,10 毫秒后再度执行 checkConfigInfo() 方法逻辑,周而复始;而 executorService 线程池中核心线程数相对较多,具体取决于 Runtime.getRuntime().availableProcessors(),但该线程池的职责这里还看不出来。

继续,跟进到 checkConfigInfo() 方法中的逻辑:

代码语言:javascript复制
public class ClientWorker {
    public void checkConfigInfo() {
        int listenerSize = cacheMap.size();
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i  ) {
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }
}

checkConfigInfo() 方法源码中引出了 executorService 这一线程池的任务主体,即LongPollingRunnable。LongPollingRunnable 是 ClientWorker 中一内部类,从其名称来看,长轮询的逻辑大概是要现身了。

继续往下看:

代码语言:javascript复制
public class ClientWorker {
    class LongPollingRunnable implements Runnable {
        private final int taskId;
        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }

        @Override
        public void run() {
            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            try {
                for (CacheData cacheData : cacheMap.values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                    }
                }
                // 核心逻辑一 :checkUpdateDataIds()
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas);
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = key[2];
                    try {
                        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(response.getContent());
                        cache.setType(response.getConfigType());
                    } catch (NacosException ioe) {
                        LOGGER.error("get changed config exception : ", ioe);
                    }
                }
                for (CacheData cacheData : cacheDatas) {
                    // 核心逻辑二 :checkListenerMd5()
                    cacheData.checkListenerMd5();
                }
                executorService.execute(this);
            } catch (Throwable e) {
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }
}

上述 LongPollingRunnable 中的长轮询逻辑有两处比较重要,分别是:checkUpdateDataIds()checkListenerMd5(),必须逐一击破!

checkUpdateDataIds()

代码语言:javascript复制
public class ClientWorker {
    List<String> checkUpdateDataIds(List<CacheData> cacheDatas) throws Exception {
        StringBuilder sb = new StringBuilder();
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isUseLocalConfigInfo()) {
                sb.append(cacheData.dataId).append(WORD_SEPARATOR);
                sb.append(cacheData.group).append(WORD_SEPARATOR);
                if (StringUtils.isBlank(cacheData.tenant)) {
                    sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
                } else {
                    sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                    sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
                }
            }
        }
        return checkUpdateConfigStr(sb.toString());
    }
}

checkUpdateDataIds()方法首先根据 dataId、group、md5 和 tenant (namespace) 拼接出一个字符串,然后交给checkUpdateConfigStr()方法处理。

checkUpdateConfigStr() 方法内容如下:

代码语言:javascript复制
public class ClientWorker {
    List<String> checkUpdateConfigStr(String probeUpdateString) throws Exception {
        
        Map<String, String> params = new HashMap<String, String>(2);
        params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
        Map<String, String> headers = new HashMap<String, String>(2);
        headers.put("Long-Pulling-Timeout", ""   timeout);
        
        if (StringUtils.isBlank(probeUpdateString)) {
            return Collections.emptyList();
        }
        
        try {
            long readTimeoutMs = timeout   (long) Math.round(timeout >> 1);
            HttpRestResult<String> result = agent
                    .httpPost("/v1/cs/configs/listener", headers, params, agent.getEncode(), readTimeoutMs);
            if (result.ok()) {
                return parseUpdateDataIdResponse(result.getData());
            } else {
                LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),
                        result.getCode());
            }
        } catch (Exception e) {
            throw e;
        }
        return Collections.emptyList();
    }
}

显然,checkUpdateConfigStr() 方法逻辑中涉及与POST /v1/cs/configs/listener这一 RESTful 接口交互,它是由 config-server 提供的、负责处理长轮询的核心接口,主要用于判断配置项是否发生变更,如果配置项已变更,则返回一串由 dataId、group 和 tenant (namespace) 拼接而成的字符串;否则,则啥都不返回。

checkListenerMd5()

既然监听器是填充到了 CacheData 中,那监听器的回调自然也在这里了。如下所示:

代码语言:javascript复制
public class CacheData {
    public final String dataId;
    public final String group;
    public final String tenant;
    private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
    private volatile String md5;
    private volatile String content;
    private int taskId;
    private String type;

    void checkListenerMd5() {
        for (ManagerListenerWrap wrap : listeners) {
            if (!md5.equals(wrap.lastCallMd5)) {
                safeNotifyListener(dataId, group, content, type, md5, wrap);
            }
        }
    }

    private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
                                    final String md5, final ManagerListenerWrap listenerWrap) {
        final Listener listener = listenerWrap.listener;

        Runnable job = new Runnable() {
            @Override
            public void run() {
                try {
                    ConfigResponse cr = new ConfigResponse();
                    cr.setDataId(dataId);
                    cr.setGroup(group);
                    cr.setContent(content);
                    String contentTmp = cr.getContent();
                    listener.receiveConfigInfo(contentTmp);

                    // compare lastContent and content
                    if (listener instanceof AbstractConfigChangeListener) {
                        Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);
                        ConfigChangeEvent event = new ConfigChangeEvent(data);
                        ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                        listenerWrap.lastContent = content;
                    }

                    listenerWrap.lastCallMd5 = md5;
                } catch (NacosException ex) {}
            }
        };
        job.run();
    }
}

3.3 Config Server 如何处理长轮询

刚才提到POST /v1/cs/configs/listener这一 RESTful 接口是由 config-server 提供的、负责处理长轮询的核心接口,那自然要从它入手来摸清 Config Server 应对长轮询的思路。

代码语言:javascript复制
@RestController
@RequestMapping("/v1/cs/configs")
public class ConfigController {
    private final ConfigServletInner inner;

    @Autowired
    public ConfigController(ConfigServletInner configServletInner, PersistService persistService,
                            ConfigSubService configSubService) {
        this.inner = configServletInner;
        this.persistService = persistService;
        this.configSubService = configSubService;
    }

    /**
     * The client listens for configuration changes.
     */
    @PostMapping("/listener")
    @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
    public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        String probeModify = request.getParameter("Listening-Configs");
        if (StringUtils.isBlank(probeModify)) {
            throw new IllegalArgumentException("invalid probeModify");
        }

        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);

        Map<String, String> clientMd5Map;
        try {
            clientMd5Map = MD5Util.getClientMd5Map(probeModify);
        } catch (Throwable e) {
            throw new IllegalArgumentException("invalid probeModify");
        }

        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
    }
}

卧槽,上面这些代码咱们熟的很啊,天天写。

紧接着跟进到ConfigServletInner中一探究竟:

代码语言:javascript复制
@Service
public class ConfigServletInner {
    @Autowired
    private LongPollingService longPollingService;

    public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
        // Long polling.
        if (LongPollingService.isSupportLongPolling(request)) {
            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
            return HttpServletResponse.SC_OK   "";
        }

        // Ignore short polling logic.
    }
}

显然,ConfigServletInner 的 doPollingConfig() 方法同时兼容长短轮询的处理逻辑,如果请求头含有Long-Pulling-Timeout字段,那么就按照长轮询来处理,否则将该请求视为短轮询 (当然短轮询逻辑笔者没有放出来)。

继续往下看,LongPollingService中的长轮询处理逻辑:

代码语言:javascript复制
@Service
public class LongPollingService {
    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) {
        String str = req.getHeader("Long-Pulling-Timeout");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) {
            generateResponse(req, rsp, changedGroups);
            return;
        }
        String ip = RequestUtil.getRemoteIp(req);

        // Must be called by http thread, or send response.
        final AsyncContext asyncContext = req.startAsync();

        // AsyncContext.setTimeout() is incorrect, Control by oneself
        asyncContext.setTimeout(0L);

        ConfigExecutor.executeLongPolling(
                new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout));
    }
}

class ClientLongPolling implements Runnable {
    
    final AsyncContext asyncContext;
    final Map<String, String> clientMd5Map;
    final long createTime;
    final String ip;
    final int probeRequestSize;
    final long timeoutTime;

    @Override
    public void run() {
        asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
            @Override
            public void run() {
                try {
                    // Delete subsciber's relations.
                    boolean removeFlag = allSubs.remove(ClientLongPolling.this);

                    if (removeFlag) {
                        sendResponse(null);
                    }
                } catch (Throwable t) {
                    LogUtil.DEFAULT_LOG.error("long polling error:"   t.getMessage(), t.getCause());
                }

            }

        }, timeoutTime, TimeUnit.MILLISECONDS);

        allSubs.add(this);
    }

    void sendResponse(List<String> changedGroups) {

        // Cancel time out task.
        if (null != asyncTimeoutFuture) {
            asyncTimeoutFuture.cancel(false);
        }
        generateResponse(changedGroups);
    }

    void generateResponse(List<String> changedGroups) {
        if (null == changedGroups) {
            // Tell web container to send http response.
            asyncContext.complete();
            return;
        }

        HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();

        try {
            final String respString = MD5Util.compareMd5ResultString(changedGroups);

            // Disable cache.
            response.setHeader("Pragma", "no-cache");
            response.setDateHeader("Expires", 0);
            response.setHeader("Cache-Control", "no-cache,no-store");
            response.setStatus(HttpServletResponse.SC_OK);
            response.getWriter().println(respString);
            asyncContext.complete();
        } catch (Exception ex) {
            asyncContext.complete();
        }
    }
}

在上述内容中,addLongPollingClient() 方法交代了三件事:

  1. Config Client 在访问POST /v1/cs/configs/listener接口时,会携带一个 key 为Long-Pulling-Timeout、value 为 30000 的请求头信息,然后通过 30000 - 500 得到一个超时时间,即 29.5 秒。
  2. 根据 Config Client 的请求参数来计算出已变更的配置项;具体地,根据请求参数中 dataId group tenant (namespace) 向 Config Server 获取当前配置项的 MD5 值,然后和请求参数中的 MD5 值比较,如果不相等,则认为配置已变更,那么就将请求参数中的 dataId group tenant (namespace) 追加到changedGroups中;
  3. 如果 changedGroups 非空,那么立即将 changedGroups 中的变更配置响应给 Config Client。
  4. 否则,启用 Asynchronous Servlet,交由ConfigExecutor中的LONG_POLLING_EXECUTOR线程池来延迟响应 Config Client,即 29.5 秒后响应 Config Client,当然由于没有配置发生变更,自然响应给 Config Client 的内容也就是空的了;此外,LONG_POLLING_EXECUTOR 线程池是由Executors.newScheduledThreadPool(1, threadFactory)生成的ScheduledExecutorService,虽然该线程池核心线程数只有一,却可以应对一众 Config Client。

4 总结

文中代码在源码基础上做了改动,移除了一些与本文主题不相干的逻辑,比如:本地文件快照,完整内容请移步官方仓库。希望大家在读完本文后,在实际工作中如果遇到基于长轮询的客户端 Pull 场景,不要忽略了 Asynchronous Servlet

参考文档

  1. Asynchronous Processing https://docs.oracle.com/javaee/7/tutorial/servlets012.htm
  2. Nonblocking I/O https://docs.oracle.com/javaee/7/tutorial/servlets013.htm
  3. https://nacos.io/zh-cn/blog/index.html

0 人点赞