Dubbo 优雅停机

2022-05-13 17:28:35 浏览数 (1)

之前的几个章节都在讲解Dubbo的种种流程性的逻辑,首先讲到了服务启动和服务调用,然后又讲到了服务治理的一些内容。作为一个成熟的RPC框架,这些都是必要的内容,但是有一点往往是容易被人忽略的,那就是优雅停机。今天我们就一起来看一下Dubbo对于优雅停机的一些支持性动作。

优雅停机主要用在服务版本迭代上线的过程中,比如我们发布了新的服务版本,经常性是直接替换线上正在跑的服务,这个时候如果在服务切换的过程中老的服务没有正常关闭的话,容易造成内存清理问题,所以优雅停机也是重要的一环。 Dubbo的优雅停机是依赖于JDK的ShutdownHook函数,下面先了解一下JDK的ShutdownHook函数会在哪些时候生效:

  • 程序正常退出
  • 程序中使用System.exit()退出JVM
  • 系统发生OutofMemory异常
  • 使用kill pid干掉JVM进程的时候(kill -9时候是不能触发ShutdownHook生效的)

Dubbo优雅停机代码解读

dubbo的优雅停机代码入口就在于AbstractConfig的静态代码块中:

代码语言:javascript复制
    static {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                if (logger.isInfoEnabled()) {
                    logger.info("Run shutdown hook now.");
                }
                ProtocolConfig.destroyAll();
            }
        }, "DubboShutdownHook"));
    }
    
    //在停机的时候往往要注意的是:此时服务器很大可能性既是consumer又是provider,所以要在两方面都进行一定的处理
    public static void destroyAll() {
        // 关闭所有注册中心,清空注册中心的内容。
        // 关闭zk连接,这时候consumer端从zk上已经找不到关闭的服务了
        // 取消所有的注册和订阅关系,作为consumer则不再监听数据变更,作为provider则简单断开于zk的连接
        AbstractRegistryFactory.destroyAll();
        ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
        for (String protocolName : loader.getLoadedExtensions()) {
            try {
                Protocol protocol = loader.getLoadedExtension(protocolName);
                if (protocol != null) {
                    //关闭Server
                    protocol.destroy();
                }
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    
    /**
     * 关闭所有已创建注册中心
     */
    public static void destroyAll() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries "   getRegistries());
        }
        // 锁定注册中心关闭过程,防止一个注册中心被多次关闭
        LOCK.lock();
        try {
            for (Registry registry : getRegistries()) {
                try {
                    //以zk注册中心为例讲解,zkRegistry->FailbackRegistry->AbstractRegistry
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            REGISTRIES.clear();
        } finally {
            // 释放锁
            LOCK.unlock();
        }
    }
    
    //ZookeeperRegistry.destory() 
    //可以看到的是这一层关闭的核心就是关闭zkClient
    public void destroy() {
        super.destroy();
        try {
            zkClient.close();
        } catch (Exception e) {
            logger.warn("Failed to close zookeeper client "   getUrl()   ", cause: "   e.getMessage(), e);
        }
    }
    
    //FailbackRegistry.destory()
    //首先要明白FailbackRegistry的核心就在于失败重试,所以这一层的关闭只要关闭retryFuture就可以
    public void destroy() {
        super.destroy();
        try {
            retryFuture.cancel(true);
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
    
    //AbstractRegistry.destory()
    //处理通用的destory逻辑
    public void destroy() {
        if (logger.isInfoEnabled()){
            logger.info("Destroy registry:"   getUrl());
        }
        //作为provider,取消所有的服务注册
        Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
        if (! destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<URL>(getRegistered())) {
                if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                    try {
                        //从已注册的列表中移除该URL
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url "   url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url "   url   " to registry "   getUrl()   " on destroy, cause: "   t.getMessage(), t);
                    }
                }
            }
        }
        //作为consumer,取消所有的订阅关系
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (! destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        //将listener从订阅者对应的listener集合中移除(监听的服务变更将不再进行通知)
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url "   url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url "   url   " to registry "   getUrl()   " on destroy, cause: "  t.getMessage(), t);
                    }
                }
            }
        }
    }

下面总结一下AbstractRegistryFactory.destroyAll()做的所有事情:

断开于zk的连接。 默认情况下服务端的dynamic为true,也就是dubbo自己管理服务的注册,所以这时候会在zk节点上创建临时的URL节点信息,在客户端与zk端开之后,zk监测到连接关闭,所以客户端创建的临时节点信息也会直接移除(zk临时节点的特性)。作为provider,这时候在zk节点上已经没有自己的信息了,所以这时候consumer理论上已经不会再看到该provider的信息了,也就是说不会有新的请求在过来,但是如果集群比较庞大的话,可能不止有一个zk节点,这时候可能依然会有请求过来。作为consumer,因为consumer在zk上注册的为持久节点,所以在连接断开时候并不会删除该节点,只是会移除对应的监听器。但是这里有一个容易忽略的问题就是,服务端注册的节点在zk上并不会删除,那么下次当consumer再次subscribe的时候依然后创建该节点,这时候因为该节点在上次停机的时候已经创建过了,重新创建就会抛异常了,这要怎么处理?哈哈 ,Dubbo的做法是直接捕获NodeExistsException然后什么都不做,如果出现该异常了默认就是创建成功,只不过会再次重新注册监听器而已。

接下来就看一下Protocol.destory(),因为Protocol的实现类中主要分为两类,一类是RegistryProtoocl,另外一类是可扩展的Protocol(DubboProtocol)。

代码语言:javascript复制
    //RegistryProtocol.destory()
    public void destroy() {
        List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
        for(Exporter<?> exporter :exporters){
            exporter.unexport();
        }
        bounds.clear();
    }
    
    //DubboExporter.destory()
    //主要是将exporter从对应的exporterMap.remove中移除
    public void unexport() {
        super.unexport();
        exporterMap.remove(key);
    }
    
    //AbstractExporter.destory()
    //将exporter中引用的invoker进行destroy调用,因为Invoker有包装类,所以在ExtensionLoader加载的时候实际上会加上包装。
    public void unexport() {
        if (unexported) {
            return ;
        }
        unexported = true;
        getInvoker().destroy();
    }
    
    //DubboInvoker
    public void destroy() {
        //防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭
        if (super.isDestroyed()){
            return ;
        } else {
            //dubbo check ,避免多次关闭
            destroyLock.lock();
            try{
                if (super.isDestroyed()){
                    return ;
                }
                super.destroy();
                if (invokers != null){
                    invokers.remove(this);
                }
                for (ExchangeClient client : clients) {
                    try {
                        client.close();
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
                
            }finally {
                destroyLock.unlock();
            }
        }
    }

RegistryProtocol中文翻译就是注册协议,注册协议只关心跟注册有关的内容,而Exporter和Invoker都是RegistryProtocol下层的内容,所以在调用注册协议关闭服务的时候会讲其下的Exporter和Invoker都关闭掉。 注册协议和服务协议的区别就是注册协议只关系服务注册的相关逻辑,而不会考虑服务暴露,服务引用的一些内容,这些内容要在DubboProtocol中去处理:

代码语言:javascript复制
    public void destroy() {
        //关停所有的Server,作为provider将不再接收新的请求
        for (String key : new ArrayList<String>(serverMap.keySet())) {          
            //HeaderExchangeServer
            ExchangeServer server = serverMap.remove(key);
            if (server != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo server: "   server.getLocalAddress());
                    }
                    server.close(getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }

        //关停所有的Client,作为consumer将不再发送新的请求
        for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
            ExchangeClient client = referenceClientMap.remove(key);
            if (client != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo connect: "   client.getLocalAddress()   "-->"   client.getRemoteAddress());
                    }
                    client.close();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
        //对于幽灵客户端的处理逻辑暂时先忽略
        stubServiceMethodsMap.clear();
        super.destroy();
    }
    
    //HeaderExchangeServer.close(timeout)
    public void close(final int timeout) {
        if (timeout > 0) {
            final long max = (long) timeout;
            final long start = System.currentTimeMillis();
            if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, false)){
                sendChannelReadOnlyEvent();
            }
            //作为server在关闭的时候很有可能仍然有任务在进行中,这时候这个timeout的时间就是用来等待相应处理结束的,每隔10ms进行一次重试,直到最后超时
            while (HeaderExchangeServer.this.isRunning() 
                    && System.currentTimeMillis() - start < max) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        doClose();
        //NettyServer
        server.close(timeout);
    }
    //关闭处理心跳任务的定时器
    private void doClose() {
        if (closed) {
            return;
        }
        closed = true;
        stopHeartbeatTimer();
        try {
            scheduled.shutdown();
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
    
    //AbstractService.close()
    //作者的本意就是在这里关闭掉业务线程池,这里提到的业务线程池也就是dubbo处理所有自定义业务使用的线程池,关闭这个线程池十分重要,但是老版本的代码在这里有BUG
    public void close(int timeout) {
        ExecutorUtil.gracefulShutdown(executor ,timeout);
        //close方法就是强制关闭业务线程池,并且关闭NettyServer中相关Channel
        close();
    }
    public static void gracefulShutdown(Executor executor, int timeout) {
        if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
            return;
        }
        final ExecutorService es = (ExecutorService) executor;
        try {
            //不再接收新的任务,将原来未执行完的任务执行完
            es.shutdown();
        } catch (SecurityException ex2) {
            return ;
        } catch (NullPointerException ex2) {
            return ;
        }
        try {//如果到达timeout时间之后仍然没有关闭任务,就直接调用shutdownNow,强制关闭所有任务
            if(! es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
                es.shutdownNow();
            }
        } catch (InterruptedException ex) {
            es.shutdownNow();
            //不要生吞InterruptedException,所以在本地调用中依然将本线程的interrupted置位,以便上层能够发现
            Thread.currentThread().interrupt();
        }
        //如果到这里都没有关闭成功的话,就重新开线程关闭业务线程池
        if (!isShutdown(es)){
            newThreadToCloseExecutor(es);
        }
    }

上面提到了一个Bug,这里简单介绍一下: 因为在关闭executor的时候,作者本意就是这里的executor就是业务线程池,但是实际上这里并不是业务线程池。原因如下: 在初始server的时候在Abstract中有这么一段代码:

代码语言:javascript复制
if (handler instanceof WrappedChannelHandler ){
   executor = ((WrappedChannelHandler)handler).getExecutor();
}

在NettyServer中对于handler包装的最后结果导致这个handler实际上是MultiMessageHandler,而MultiMessageHandler跟WrappedChannelHandler没有继承关闭,所以这里的executor实际上是null,没有引用到实际的业务线程池,所以在关闭的时候导致业务线程池没有成功关闭。这个BUG已经在后面的dubbo其他版本中修复,这里可以查看其中一种修复方法:解决方案

下面看一下ExchangeClient.destory()

代码语言:javascript复制
    public void close(int timeout) {
        doClose();
        channel.close(timeout);
    }
    //HeaderExchangeChannel.clse()
    //关闭心跳处理
    private void doClose() {
        stopHeartbeatTimer();
    }
    
    // graceful close
    public void close(int timeout) {
        if (closed) {
            return;
        }
        closed = true;
        if (timeout > 0) {
        //这里作者的本意是看一下客户端是否有发出去的请求,但是还没有收到相应的,然后等到timeout时间看请求是否返回
        //但是因为DefaultFuture在发送请求时候的key是成员变量channel,而不是HeaderExchangeChannel.this,所以这代码有BUG
            long start = System.currentTimeMillis();
            while (DefaultFuture.hasFuture(HeaderExchangeChannel.this) 
                    && System.currentTimeMillis() - start < timeout) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        close();
    }
    
    public void close() {
        try {
            channel.close();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

基本上到这里,所有的清理操作都介绍的差不多了。撒花~~~

0 人点赞