如何在 Spring Boot 优雅关闭加入一些自定义机制

2022-05-09 14:27:21 浏览数 (1)

个人创作公约:本人声明创作的所有文章皆为自己原创,如果有参考任何文章的地方,会标注出来,如果有疏漏,欢迎大家批判。如果大家发现网上有抄袭本文章的,欢迎举报,并且积极向这个 github 仓库 提交 issue,谢谢支持~

我们知道从 Spring Boot 2.3.x 这个版本开始,引入了优雅关闭的机制。我们也在线上部署了这个机制,来增加用户体验。虽然现在大家基本上都通过最终一致性,以及事务等机制,来保证了就算非优雅关闭,也可以保持业务正确。但是,这样总会带来短时间的数据不一致,影响用户体验。所以,引入优雅关闭,保证当前请求处理完,再开始 Destroy 所有 ApplicationContext 中的 Bean。

优雅关闭存在的问题

ApplicationContext 的关闭过程简单来说分为以下几个步骤(对应源码 AbstractApplicationContext 的 doClose 方法):

  1. 取消当前 ApplicationContext 在 LivBeanView 的注册(目前其实只包含从 JMX 上取消注册)
  2. 发布 ContextClosedEvent 事件,同步处理所有这个事件的 Listener
  3. 处理所有实现 Lifecycle 接口的 Bean,解析他们的关闭顺序,并调用他们的 stop 方法
  4. Destroy 所有 ApplicationContext 中的 Bean
  5. 关闭 BeanFactory

简单理解优雅关闭,其实就是在上面的第三步中加入优雅关闭的逻辑实现的 Lifecycle,包括如下两步:

  1. 切断外部流量入口:具体点说就是让 Spring Boot 的 Web 容器直接拒绝所有新收到的请求,不再处理新请求,例如直接返回 503.
  2. 等待承载的 Dispatcher 的线程池处理完所有请求:对于同步的 Servlet 进程其实就是处理 Servlet 请求的线程池,对于异步响应式的 WebFlux 进程其实就是所有 Web 请求的 Reactor 线程池处理完当前所有 Publisher 发布的事件。

首先,切断外部流量入口保证不再有新的请求到来,线程池处理完所有请求之后,正常的业务逻辑也是正常走完的,在这之后就可以开始关闭其他各种元素了。

但是,我们首先要保证,优雅关闭的逻辑,需要在所有的 Lifecycle 的第一个最保险。这样保证一定所有请求处理完,才会开始 stop 其他的 Lifecycle。如果不这样会有啥问题呢?举个例子,例如某个 Lifecycle 是负载均衡器的,stop 方法会关闭负载均衡器,如果这个 Lifecycle 在优雅关闭的 Lifecycle 的 stop 之前进行 stop,那么可能会造成某些在 负载均衡器 stop 后还没处理完的请求,并且这些请求需要使用负载均衡器调用其他微服务,执行失败。

优雅关闭还有另一个问题就是,默认的优雅关闭功能不是那么全面,有时候我们需要在此基础上,添加更多的关闭逻辑。例如,你的项目中不止 有 web 容器处理请求的线程池,你自己还使用了其他线程池,并且线程池可能还比较复杂,一个向另一个提交,互相提交,各种提交等等,我们需要在 web 容器处理请求的线程池处理完所有请求后,再等待这些线程池的执行完所有请求后再关闭。还有一个例子就是针对 MQ 消费者的,当优雅关闭时,其实应该停止消费新的消息,等待当前所有消息处理完。这些问题可以看下图:

源码分析接入点 - Spring Boot Undertow & 同步 Servlet 环境

我们从源码触发,分析在 Spring Boot 中使用 Undertow 作为 Web 容器并且是同步 Servlet 环境下,如果接入自定义的机制。首先,在引入 spring boot 相关依赖并且配置好优雅关闭之后:

pom.xml

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <exclusions>
        <!--不使用默认的 tomcat 容器-->
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!--使用 undertow 容器-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-undertow</artifactId>
</dependency>

application.yml

代码语言:javascript复制
server:
  # 设置关闭方式为优雅关闭
  shutdown: graceful
  
management:
  endpoint:
    health:
      show-details: always
    # actuator 暴露 /actuator/shutdown 接口用于关闭(由于这里开启了优雅关闭所以其实是优雅关闭)
    shutdown:
      enabled: true
  endpoints:
    jmx:
      exposure:
        exclude: '*'
    web:
      exposure:
        include: '*'

在设置关闭方式为优雅关闭之后,Spring Boot 启动时,在创建基于 Undertow 实现的 WebServer 的时候,会添加优雅关闭的 Handler,参考源码:

UndertowWebServerFactoryDelegate

代码语言:javascript复制
static List<HttpHandlerFactory> createHttpHandlerFactories(Compression compression, boolean useForwardHeaders,
			String serverHeader, Shutdown shutdown, HttpHandlerFactory... initialHttpHandlerFactories) {
	List<HttpHandlerFactory> factories = new ArrayList<>(Arrays.asList(initialHttpHandlerFactories));
	if (compression != null && compression.getEnabled()) {
		factories.add(new CompressionHttpHandlerFactory(compression));
	}
	if (useForwardHeaders) {
		factories.add(Handlers::proxyPeerAddress);
	}
	if (StringUtils.hasText(serverHeader)) {
		factories.add((next) -> Handlers.header(next, "Server", serverHeader));
	}
	//如果指定了优雅关闭,则添加 gracefulShutdown
	if (shutdown == Shutdown.GRACEFUL) {
		factories.add(Handlers::gracefulShutdown);
	}
	return factories;
}

添加的这个 Handler 就是 Undertow 的 GracefulShutdownHandlerGracefulShutdownHandler 是一个 HttpHandler,这个接口很简单:

代码语言:javascript复制
public interface HttpHandler {
    void handleRequest(HttpServerExchange exchange) throws Exception;
}

其实就是对于收到的每个 HTTP 请求,都会经过每个 HttpHandler 的 handleRequest 方法。GracefulShutdownHandler 的实现思路也很简单,既然每个请求都会经过这个类的 handleRequest 方法,那么我就在收到请求的时候将一个原子计数器原子 1,请求处理完后(注意是返回响应之后,不是方法返回,因为请求可能是异步的,所以这个做成了回调),将原子计数器原子 - 1,如果这个计数器为零,就证明没有任何正在处理的请求了。源码是:

GracefulShutdownHandler:

代码语言:javascript复制
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
    //原子更新,请求计数器加一,返回的 snapshot 是包含是否关闭状态位的数字
    long snapshot = stateUpdater.updateAndGet(this, incrementActive);
    //通过状态位判断是否正在关闭
    if (isShutdown(snapshot)) {
        //如果正在关闭,直接请求数原子减一
        decrementRequests();
        //设置响应码为 503
        exchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE);
        //标记请求完成
        exchange.endExchange();
        //直接返回,不继续走其他的 HttpHandler
        return;
    }
    //添加请求完成时候的 listener,这个在请求完成返回响应时会被调用,将计数器原子减一
    exchange.addExchangeCompleteListener(listener);
    //继续走下一个 HttpHandler
    next.handleRequest(exchange);
}

那么,是什么时候调用的这个关闭呢?前面我们说过 ApplicationContext 的关闭过程的第三步:处理所有实现 Lifecycle 接口的 Bean,解析他们的关闭顺序,并调用他们的 stop 方法,其实优雅关闭就在这里被调用。当 Spring Boot Undertow & 同步 Servlet 环境启动时,到了创建 WebServer 这一步,会创建一个优雅关闭的 Lifecycle,对应源码:

ServletWebServerApplicationContext

代码语言:javascript复制
private void createWebServer() {
	WebServer webServer = this.webServer;
	ServletContext servletContext = getServletContext();
	if (webServer == null && servletContext == null) {
		StartupStep createWebServer = this.getApplicationStartup().start("spring.boot.webserver.create");
		ServletWebServerFactory factory = getWebServerFactory();
		createWebServer.tag("factory", factory.getClass().toString());
		this.webServer = factory.getWebServer(getSelfInitializer());
		createWebServer.end();
		//就是这里,创建一个 WebServerGracefulShutdownLifecycle 并注册到当前 ApplicationContext 的 BeanFactory 中
		getBeanFactory().registerSingleton("webServerGracefulShutdown",
				new WebServerGracefulShutdownLifecycle(this.webServer));
		getBeanFactory().registerSingleton("webServerStartStop",
				new WebServerStartStopLifecycle(this, this.webServer));
	}
	else if (servletContext != null) {
		try {
			getSelfInitializer().onStartup(servletContext);
		}
		catch (ServletException ex) {
			throw new ApplicationContextException("Cannot initialize servlet context", ex);
		}
	}
	initPropertySources();
}

前面说到, ApplicationContext 的关闭过程的第三步调用所有 Lifecycle 的 stop 方法,这里即 WebServerGracefulShutdownLifecycle 中的 stop 方法:

WebServerGracefulShutdownLifecycle

代码语言:javascript复制
@Override
public void stop(Runnable callback) {
	this.running = false;
	this.webServer.shutDownGracefully((result) -> callback.run());
}

这里的 webServer,由于我们使用的是 Undertow,对应实现就是 UndertowWebServer,看一下他的 shutDownGracefully 实现:

UndertowWebServer

代码语言:javascript复制
//这里的这个 GracefulShutdownHandler 就是前面说的在启动时加的 GracefulShutdownHandler
private volatile GracefulShutdownHandler gracefulShutdown;

@Override
public void shutDownGracefully(GracefulShutdownCallback callback) {
    // 如果 GracefulShutdownHandler 不为 null,证明开启了优雅关闭(server.shutdown=graceful)
	if (this.gracefulShutdown == null) {
	    //为 null,就证明没开启优雅关闭,什么都不等
		callback.shutdownComplete(GracefulShutdownResult.IMMEDIATE);
		return;
	}
	//开启优雅关闭,需要等待请求处理完
	logger.info("Commencing graceful shutdown. Waiting for active requests to complete");
	this.gracefulShutdownCallback.set(callback);
	//调用 GracefulShutdownHandler 的 shutdown 进行优雅关闭
	this.gracefulShutdown.shutdown();
	//调用 GracefulShutdownHandler 的 addShutdownListener 添加关闭后调用的操作,这里是调用 notifyGracefulCallback
	//其实就是调用方法参数的 callback(就是外部的回调)
	this.gracefulShutdown.addShutdownListener((success) -> notifyGracefulCallback(success));
}

private void notifyGracefulCallback(boolean success) {
	GracefulShutdownCallback callback = this.gracefulShutdownCallback.getAndSet(null);
	if (callback != null) {
		if (success) {
			logger.info("Graceful shutdown complete");
			callback.shutdownComplete(GracefulShutdownResult.IDLE);
		}
		else {
			logger.info("Graceful shutdown aborted with one or more requests still active");
			callback.shutdownComplete(GracefulShutdownResult.REQUESTS_ACTIVE);
		}
	}
}

再看下 GracefulShutdownHandler 的 shutdown 方法以及 addShutdownListener 方法:

GracefulShutdownHandler:

代码语言:javascript复制
public void shutdown() {
    //设置关闭状态位,并原子   1
    stateUpdater.updateAndGet(this, incrementActiveAndShutdown);
    //直接请求数原子减一
    decrementRequests();
}

private void decrementRequests() {
    long snapshot = stateUpdater.updateAndGet(this, decrementActive);
    // Shutdown has completed when the activeCount portion is zero, and shutdown is set.
    //如果与 关闭状态位 MASK 完全相等,证明其他位都是 0,证明剩余处理中的请求数量为 0
    if (snapshot == SHUTDOWN_MASK) {
        //调用 shutdownComplete
        shutdownComplete();
    }
}

private void shutdownComplete() {
    synchronized (lock) {
        lock.notifyAll();
        //调用每个 ShutdownListener 的 shutdown 方法
        for (ShutdownListener listener : shutdownListeners) {
            listener.shutdown(true);
        }
        shutdownListeners.clear();
    }
}

/**
 * 这个方法并不只是字面意思,首先如果不是关闭中不能添加 ShutdownListener
 * 然后如果没有请求了,就直接调用传入的 shutdownListener 的 shutdown 方法
 * 如果还有请求,则添加入 shutdownListeners,等其他调用 shutdownComplete 的时候遍历 shutdownListeners 调用 shutdown
 * lock 主要为了 addShutdownListener 与 shutdownComplete 对 shutdownListeners 的访问安全
 * lock 的 wait notify 主要为了实现 awaitShutdown 机制,我们这里没有提
 */
public void addShutdownListener(final ShutdownListener shutdownListener) {
        synchronized (lock) {
            if (!isShutdown(stateUpdater.get(this))) {
                throw UndertowMessages.MESSAGES.handlerNotShutdown();
            }
            long count = activeCount(stateUpdater.get(this));
            if (count == 0) {
                shutdownListener.shutdown(true);
            } else {
                shutdownListeners.add(shutdownListener);
            }
        }
    }

这就是优雅关闭的底层原理,但是我们还没有分析清楚 ApplicationContext 的关闭过程的第三步以及优雅关闭与其他 Lifecycle Bean 的 stop 先后顺序,我们这里来理清一下,首先我们看一下 Smart

开始关闭 Lifecycle Bean 的入口:

DefaultLifecycleProcessor

代码语言:javascript复制
private void stopBeans() {
    //读取所有的 Lifecycle bean,返回的是一个 LinkedHashMap,遍历它的顺序和放入的顺序一样
    //放入的顺序就是从 BeanFactory 读取所有 Lifecycle 的 Bean 的返回顺序,这个和 Bean 加载顺序有关,不太可控,可能这个版本加载顺序升级一个版本就变了
	Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
	//按照每个 Lifecycle 的 Phase 值进行分组
	//如果实现了 Phased 接口就通过其 phase 方法返回得出 phase 值
	//如果没有实现 Phased 接口则认为 Phase 是 0
	Map<Integer, LifecycleGroup> phases = new HashMap<>();
	lifecycleBeans.forEach((beanName, bean) -> {
		int shutdownPhase = getPhase(bean);
		LifecycleGroup group = phases.get(shutdownPhase);
		if (group == null) {
			group = new LifecycleGroup(shutdownPhase, this.timeoutPerShutdownPhase, lifecycleBeans, false);
			phases.put(shutdownPhase, group);
		}
		group.add(beanName, bean);
	});
	//如果不为空,证明有需要关闭的 Lifecycle,开始关闭
	if (!phases.isEmpty()) {
	    //按照 Phase 值倒序
		List<Integer> keys = new ArrayList<>(phases.keySet());
		keys.sort(Collections.reverseOrder());
		//挨个关闭
		for (Integer key : keys) {
			phases.get(key).stop();
		}
	}
}

总结起来,其实就是:

  1. 获取当前 ApplicationContext 的 Beanfactory 中的所有实现了 Lifecycle 接口的 Bean。
  2. 读取每个 Bean 的 Phase 值,如果这个 Bean 实现了 Phased 接口,就取接口方法返回的值,如果没有实现就是 0.
  3. 按照 Phase 值将 Bean 分组
  4. 按照 Phase 值从大到小的顺序,依次遍历每组进行关闭
  5. 具体关闭每组的逻辑我们就不详细看代码了,知道关闭的时候其实还看了当前这个 Lifecycle 的 Bean 是否还依赖了其他的 Lifecycle 的 Bean,如果依赖了,优先关掉被依赖的 Lifecycle Bean

我们来看下前面提到的优雅关闭相关的 WebServerGracefulShutdownLifecycle 的 Phase 是:

代码语言:javascript复制
class WebServerGracefulShutdownLifecycle implements SmartLifecycle {
    ....
}

SmartLifecycle 包含了 Phased 接口以及默认实现:

代码语言:javascript复制
public interface SmartLifecycle extends Lifecycle, Phased {
    int DEFAULT_PHASE = Integer.MAX_VALUE;
    @Override
	default int getPhase() {
		return DEFAULT_PHASE;
	}
}

可以看出,只要实现了 SmartLifecycle,Phase 默认就是最大值。所以优雅关闭的 Lifecycle: WebServerGracefulShutdownLifecycle 的 Phase 就是最大值,也就是属于最先被关闭的那一组

总结接入点 - Spring Boot Undertow & 同步 Servlet 环境

1. 接入点一 - 通过添加实现 SmartLifecycle 接口的 Bean,指定 Phase 比 WebServerGracefulShutdownLifecycle 的 Phase 小

前面的分析中,我们已经知道了:WebServerGracefulShutdownLifecycle 的 Phase 就是最大值,也就是属于最先被关闭的那一组。我们想要实现的是在这之后加入一些优雅关闭的逻辑,同时在 Destroy Bean (前面提到的 ApplicationContext 关闭的第四步)之前(即 Bean 销毁之前,某些 Bean 销毁中就不能用了,比如微服务调用中的一些 Bean,这时候如果还有任务没完成调用他们就会报异常)。那我们首先想到的就是加入一个 Phase 在这时候的 Lifecycle,在里面实现我们的优雅关闭接入,例如:

代码语言:javascript复制
@Log4j2
@Component
public class BizThreadPoolShutdownLifecycle implements SmartLifecycle {
    private volatile boolean running = false;
    
    @Override
    public int getPhase() {
        //在 WebServerGracefulShutdownLifecycle 那一组之后
        return SmartLifecycle.DEFAULT_PHASE - 1;
    }

    @Override
    public void start() {
        this.running = true;
    }

    @Override
    public void stop() {
        //在这里编写的优雅关闭逻辑
        this.running = false;
    }

    @Override
    public boolean isRunning() {
        return running;
    }
}

这样实现兼容性比较好,并且升级底层框架依赖版本基本上不用修改。但是问题就是,可能会引入某个框架里面带 Lifecycle bean,虽然他的 Phase 是正确的,小于 WebServerGracefulShutdownLifecycle 的,但是 SmartLifecycle.DEFAULT_PHASE - 1 即等于我们自定义的 Lifecyce, 并且这个正好是需要等待我们的优雅关闭结束再关闭的,并且由于 Bean 加载顺序问题导致框架的 Lifecycle 又跑到了我们自定义的 Lifecycle 前进行 stop。这样就会有问题,但是问题出现的概率并不大。

2. 接入点二 - 通过反射向 Undertow 的 GracefulShutdownHandler 的 List<ShutdownListener> shutdownListeners 中添加 ShutdownListener 实现

这种实现方式,很明显,限定了容器必须是 undertow,并且可能升级的兼容性不好。但是可以在 Http 线程池优雅关闭后立刻执行我们的优雅关闭逻辑,不用担心引入某个依赖导致我们自定义的优雅关闭顺序有问题。与第一种孰优孰劣,请大家自行判断,简单实现是:

代码语言:javascript复制
@Log4j2
@Componenet
//仅在包含 Undertow 这个类的时候加载
@ConditionalOnClass(name = "io.undertow.Undertow")
public class ThreadPoolFactoryGracefulShutDownHandler implements ApplicationListener<ApplicationEvent> {
    
    //获取操作 UndertowWebServer 的 gracefulShutdown 字段的句柄
    private static VarHandle undertowGracefulShutdown;
    //获取操作 GracefulShutdownHandler 的 shutdownListeners 字段的句柄
    private static VarHandle undertowShutdownListeners;

    static {
        try {
            undertowGracefulShutdown = MethodHandles
                    .privateLookupIn(UndertowWebServer.class, MethodHandles.lookup())
                    .findVarHandle(UndertowWebServer.class, "gracefulShutdown",
                            GracefulShutdownHandler.class);
            undertowShutdownListeners = MethodHandles
                    .privateLookupIn(GracefulShutdownHandler.class, MethodHandles.lookup())
                    .findVarHandle(GracefulShutdownHandler.class, "shutdownListeners",
                            List.class);
        } catch (Exception e) {
            log.warn("ThreadPoolFactoryGracefulShutDownHandler undertow not found, ignore fetch var handles");
        }
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        //仅处理 WebServerInitializedEvent 事件,这个是在 WebServer 创建并初始化完成后发出的事件
        if (event instanceof WebServerInitializedEvent) {
            WebServer webServer = ((WebServerInitializedEvent) event).getWebServer();
            //检查当前的 web 容器是否是 UnderTow 的
            if (webServer instanceof UndertowWebServer) {
                GracefulShutdownHandler gracefulShutdownHandler = (GracefulShutdownHandler) undertowGracefulShutdown.getVolatile(webServer);
                //如果启用了优雅关闭,则 gracefulShutdownHandler 不为 null
                if (gracefulShutdownHandler != null) {
                    var shutdownListeners = (List<GracefulShutdownHandler.ShutdownListener>) undertowShutdownListeners.getVolatile(gracefulShutdownHandler);
                    shutdownListeners.add(shutdownSuccessful -> {
                        if (shutdownSuccessful) {
                            //添加你的优雅关闭逻辑
                        } else {
                            log.info("ThreadPoolFactoryGracefulShutDownHandler-onApplicationEvent shutdown failed");
                        }
                    });
                }
            }
        }
    }
}

如何实现额外线程池的优雅关闭

现在我们知道如何接入了,那么针对项目中的自定义线程池,如何把他们关闭呢?首先肯定是要先拿到所有要检查的线程池,这个不同环境方式不同,实现也比较简单,这里不再赘述,我们假设拿到了所有线程池,并且线程池只有以下两种实现(其实就是 JDK 中的两种线程池,忽略定时任务线程池 ScheduledThreadPoolExecutor):

  • java.util.concurrent.ThreadPoolExecutor:最常用的线程池
  • java.util.concurrent.ForkJoinPool:ForkJoin 形式的线程池

针对这两种线程池如何判断他们是否已经没有任务在执行了呢?参考代码:

代码语言:javascript复制
public static boolean isCompleted(ExecutorService executorService) {
    if (executorService instanceof ThreadPoolExecutor) {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
        //对于 ThreadPoolExecutor,就是判断没有任何 active 的线程了
        return threadPoolExecutor.getActiveCount() == 0;
    } else if (executorService instanceof ForkJoinPool) {
        //对于 ForkJoinPool,复杂一些,就是判断既没有活跃线程,也没有运行的线程,队列里面也没有任何任务并且并没有任何等待提交的任务
        ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;
        return forkJoinPool.getActiveThreadCount() == 0
                && forkJoinPool.getRunningThreadCount() == 0
                && forkJoinPool.getQueuedTaskCount() == 0
                && forkJoinPool.getQueuedSubmissionCount() == 0;
    }
    return true;
}

如何判断所有线程池都没有任务了呢?由于实际应用可能很放飞自我,比如线程池 A 可能提交任务到线程池 B,线程池 B 有可能提交任务到线程池 C,线程池 C 又有可能提交任务给 A 和 B,所以如果我们依次遍历一轮所有线程池发现上面这个方法 isCompleted 都返回 true,也是不能保证所有线程池一定运行完了的(比如我依次检查 A,B,C,检查到 C 的时候,C 又提交任务到了 A 和 B 并结束,C 检查发现任务都完成了,但是之前检查过的 A,B 又有了任务未完成)。所以我的解决办法是:打乱所有线程池,遍历,检查每个线程池是否完成,如果检查发现都完成则计数器加 1,只要有未完成的就不加并清零计数器。不断循环,每次循环 sleep 1 秒,直到计数器为 3(也就是连续三次按随机顺序检查所有线程池都没有任何任务):

代码语言:javascript复制
List<ExecutorService> executorServices = 获取所有线程池
for (int i = 0; i < 3; ) {
    //连续三次,以随机乱序检查所有的线程池都完成了,才认为是真正完成
    Collections.shuffle(executorServices);
    if (executorServices.stream().allMatch(ThreadPoolFactory::isCompleted)) {
        i  ;
        log.info("all threads pools are completed, i: {}", i);
    } else {
        //连续三次
        i = 0;
        log.info("not all threads pools are completed, wait for 1s");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException ignored) {
        }
    }
}

RocketMQ-spring-starter 中是如何处理的

rocketmq 的官方 spring boot starter:https://github.com/apache/rocketmq-spring

其中是采用我们这里说的第一种接入点方式,将消费者容器做成 SmartLifcycle(Phase 为最大值,属于最优先的关闭组),在里面加入关闭逻辑:

DefaultRocketMQListenerContainer

代码语言:javascript复制
@Override
public int getPhase() {
    // Returning Integer.MAX_VALUE only suggests that
    // we will be the first bean to shutdown and last bean to start
    return Integer.MAX_VALUE;
}
@Override
public void stop(Runnable callback) {
    stop();
    callback.run();
}
@Override
public void stop() {
    if (this.isRunning()) {
        if (Objects.nonNull(consumer)) {
            //关闭消费者
            consumer.shutdown();
        }
        setRunning(false);
    }
}

0 人点赞