这篇文章是我在评论区学到的。

2024-07-29 09:29:14 浏览数 (2)

你好呀,我是歪歪。

上周发了这篇文章《线程池遇到父子任务,有大坑,要注意!》

里面描述了一个线程池遇到父子任务的情况。

总结来说就是如果线程池的任务之间存在父子关系,那么请不要使用同一个线程池。如果使用了同一个线程池,可能会因为子任务进了队列,导致父任务一直等待,出现假死现象。

然后评论区有这样的一个评论:

comparable future 加 join,啥意思呢?

歪师傅这种经验老道的程序员一眼就 get 到了。

之前版本的代码里面,我为了让所有子任务完成后,父任务才继续执行,用 CountDownLatch 来做了一个栅栏:

其实这里也可以用“comparable future 加 join”。

我再给你一版代码:

代码语言:javascript复制
public class Main {

    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(5, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(100));
        StopWatch watch = new StopWatch("MainTest");
        CompletableFuture[] mainFutureArr = new CompletableFuture[10];
        watch.start();
        for (int i = 0; i < 10; i  ) {
            int finalI = i;
            CompletableFuture<Void> mainFuture = CompletableFuture.runAsync(() -> {
                System.out.println("当前线程"   Thread.currentThread().getName()   ",---【任务"   finalI   "】开始执行---");
                //模拟获取数据
                List<String> arrayList = getDataFromDB();
                CompletableFuture[] subFutureArr = new CompletableFuture[arrayList.size()];
                for (int j = 0; j < arrayList.size(); j  ) {
                    subFutureArr[j] = getFuture(finalI, arrayList.get(j), executorService);
                }
                //等待所有子任务完成
                CompletableFuture.allOf(subFutureArr).join();
                System.out.println("当前线程"   Thread.currentThread().getName()   ",---【任务"   finalI   "】执行完成---");
            }, executorService);
            mainFutureArr[i] = mainFuture;
        }
        //等待所有父任务完成
        CompletableFuture.allOf(mainFutureArr).join();
        watch.stop();
        System.out.println(watch.prettyPrint());
    }

    private static CompletableFuture<Void> getFuture(int finalI, String str, ExecutorService executorService) {
        return CompletableFuture.runAsync(() -> {
            System.out.println("当前线程"   Thread.currentThread().getName()   ",【任务"   finalI   "】开始处理数据="   str);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, executorService);
    }

    private static List<String> getDataFromDB() {
        ArrayList<String> arrayList = new ArrayList<>();
        arrayList.add("1");
        arrayList.add("2");
        return arrayList;
    }
}

和之前一版代码实现的逻辑是一模一样的,只是写法不一样。

提交任务的方式变化了:

原提交任务方式:java.util.concurrent.ExecutorService#submit(java.lang.Runnable) 新提交任务方式:java.util.concurrent.CompletableFuture#runAsync(java.lang.Runnable, java.util.concurrent.Executor)

然后是等待线程的方式变化了:

原等待线程方式:java.util.concurrent.CountDownLatch#await() 新等待线程方式:java.util.concurrent.CompletableFuture#join()

其实你对比新老逻辑会发现,之前线程池是线程池,栅栏是栅栏,是我们写代码的时候把这两个组合在了一起,实现了我们的功能。

而老逻辑的两个方法都是在 CompletableFuture 里面,整体看起来确实更加直观。

虽然任何一种写法都能很好的完成需求,但是多学一种,总归是有好处的。

asyncTool

除了上面这种写法之外,还有人看了文章后悄悄给我说了一个框架:asyncTool。

我还是很听话的,于是在网上搜了一下,发现这确实是一个封装的很好的线程编排框架:

https://gitee.com/jd-platform-opensource/asyncTool

在项目的介绍里面,图示了这样的几个场景。

多个执行单元的串行请求:

多个执行单元的并行请求:

阻塞等待,串行的后面跟多个并行:

串并行相互依赖:

复杂场景:

还有很多其他的介绍,比如作者分析了好几种并发场景下可能存在的需求,你可以打开前面的链接,自己看看:

我这里只是把项目特点部分单独截个图,因为从这个介绍上来看,确实是厉害的:

解决任意的多线程并行、串行、阻塞、依赖、回调的并发框架,可以任意组合各线程的执行顺序,带全链路回调和超时控制。

这个项目拉到本地后,可以直奔 test 模块,里面有各种各样的测试案例,是摸清楚这个框架原理的一个好抓手:

比如这个测试用例,我看了一下,这两个测试用例就很适合我的需求:

parallel.TestPar#testMulti3 parallel.TestPar#testMulti3Reverse

那么问题就来了?

为什么会有两个测试用例呢,注释还一模一样?

这两个测试用例在写法上有点差异:

testMulti3 的关键方法是 next,而 testMulti3Reverse 的关键方法是 depend。

拿这个具体的需求来说:

0 执行完,同时 1 和 2 , 12 都完成后 3

用 next 是需要先构建任务 3,然后构建 1 和 2,并把它们的 next 设置为 3。最后才是构建 0,并关联上 1 和 2。

用 depend 是先构建 0,然后构建 1,2,并把它们的 depend 设置为 0。最后才是构建 3,并 depend 设置为 1,2。

有人喜欢倒着写,有人喜欢顺着写。没关系,两种写法作者都给你提供了。

这个小细节让我觉得作者是真的很用心了。

我个人喜欢顺着写,所以我就直接跑 testMulti3Reverse 这个案例了。

在跑之前,我们可以看到每个任务的执行时间都是 1s:

所以,按照我们的预期应该是项目启动后 0 先完成,然后 1,2 同时执行,1s 后 3 开始执行。

跑一把看看输出是不是这样的:

从输出结果来看,确实是先执行了 0 ,然后 1,2 同时执行,最后执行的 3,符合我们的需求。

那么底层是如何实现的呢?

有源码,也有 Demo,你直接上手盘它啊。

这个框架的入口就是这个方法:

com.jd.platform.async.executor.Async#beginWork(long, java.util.concurrent.ExecutorService, java.util.List<com.jd.platform.async.wrapper.WorkerWrapper>)

你去找到这个方法的时候,还会贴心的看到作者写的:出发点。

通过 Debug 我们可以看到,workerWrappers 这个入参就是我们的 0 号任务,并且里面也有了后续任务集合:

继续往下 Debug,你会来到这个地方:

com.jd.platform.async.wrapper.WorkerWrapper#beginNext

在这个方法里面,有个 for 循环把 1 和 2 任务扔到线程池,然后等待 1 和 2 执行完成。

你看这里的逻辑,搞个 CompletableFuture[] 数组,然后有个 for 循环,循环里面 CompletableFuture.runAsync() 方法,最后把数组放到 allOf 里面 CompletableFuture.allOf(futures) 再来个 get() 方法阻塞等待。

眼熟不眼熟?

和我们前面 Demo 里面的这段代码,不能说相差无几,只能说一模一样:

这并不是巧合,是因为不管这个 asyncTool 玩儿的多花里胡哨,最底层还是基于 CompletableFuture 来做的,所以在同样的需求下,代码是类似的。

我这里主要只是给你分享一下,让你知道有这样的一个异步线程编排框架的存在。

如果你感兴趣,可以拉一下源码,作者在写代码的时候,就留下了大量的注释,学习成本并不高:

另外,作者在 QuickStart 里面也提到了。

如果需要深入了解这个框架是如何一步一步实现的,从接到需求,到每一步的思考,每个类为什么这么设计,为什么有这些方法,也就是如何从 0 到 1 开发出这个框架,可以看看作者的这四篇文章:

https://blog.csdn.net/tianyaleixiaowu/category_9637010.html

学明白了,这就是你的了。

当你遇到一个复杂的涉及到异步任务编排的需求的时候,你就可以把这个掏出来看看,应该是能比较优雅的解决你遇到的问题。

思考

回到我们自己的 Demo 中,当我用 CompletableFuture 改造完成之后,我还发现了一个小细节。

如果你还记得前一篇文章,那你应该知道是因为父子线程使用了同一个线程池导致的。

在我使用 CompletableFuture 的写法时,如果我不指定线程池,也就是这样:

会发什么呢?

程序会正常执行完成:

那么问题就来了:为什么不指定线程池的时候,反而没有问题呢?

其实你从日志输出中也能发现端倪:

当前线程ForkJoinPool.commonPool-worker-6

这说明什么?

是不是说明 runAsync 方法内置了一个默认的线程池?

而这个默认线程池没有出现问题,那我们是不是完全有理由猜测,它这个线程池的核心线程数很大,所以没有任何一个子任务进队列。

那到底是多大呢?

我也不知道,但是我知道大力出奇迹。

所以:

我把循环扩大到了 10000 次,并且在任务里面 sleep 1s,方便任务快速占满核心线程。

在程序运行之前,你觉得线程编号会到多少去?

我寻思着怎么也得千儿八百的吧。

嘿,你猜怎么着?

有意思的事情就来了:

线程池编号从 1 开始只到了 11。

后面都是在复用这 11 个线程,且程序在运行了 917s 之后正常结束了,没有出现阻塞的情况:

这个结果确实有点让人摸不着头脑。

但是我们即有 Demo 也有源码啊。

盘它源码,扒它底裤不就完了吗。

这个 runAsync 方法一进来,我们就直接看到了答案:

java.util.concurrent.CompletableFuture#runAsync(java.lang.Runnable)

可以看到当 USE_COMMON_POOL 为 false 时会使用 new ThreadPerTaskExecutor() 方法来搞个线程池。为 true 的时候,会使用 ForkJoinPool.commonPool() 来搞个线程池。

而 ThreadPerTaskExecutor 方法非常的简单粗暴:

就是每来一个任务就起一个线程,对应我们的 Demo 那就是必然不会出现父子等待的情况。

如果在歪师傅的机器上, USE_COMMON_POOL 为 false 的话,那就直接破案了啊。

然而,很不幸,它是 true。

所以我们看看为 true 的时候,ForkJoinPool.commonPool() 是怎么个事儿。

commonPool 方法进来之后直接返回了一个 common 对象,这个对象是 ForkJoinPool 类:

重启项目之后,可以观察到这个对象是在 ForkJoinPool 类初始化的时候就生成了:

其中 parallelism 的值为 11,从命名上猜也能猜出来了,这个值和 ForkJoinPool 里工作线程数量有关。

但是 11 这个值到底是怎么来的呢?

通过代码可以看到,这个值可以通过配置进行指定,如果不指定,则获取默认值。

默认值,为 CPU 核心数减一。

我的机器是 12 核的,所以 12-1=11,它就是这样来的:

而这里的这个 11 和我们前面分析日志“线程池编号从 1 开始只到了 11”也呼应上了。

所以,同样的代码,在你的电脑上跑,可能就不是 11 了,这个小细节需要注意一下。

现在我们拿到一个 ForkJoinPool 类了,只需要搞懂它的工作原理就行了。

剩下的部分,就当是一个思考题吧。

关于这个 ForkJoinPool 部分我其实是写了一点的,我又都删了,一个原因是这部分写起来确实感觉有点难度,你去看源码就知道它的源码可读性真不高。另外一个主要的原因是因为写的过程中我翻到了一篇文章:《一次线程池引发的线上故障分析》

我们要找的答案就在这篇文章里面:

我是在查阅资料的时候看到这篇文章的,看完之后,怎么说呢?

就是说:绝了,我们遇到的问题简直就是如出一辙。

而这是一篇 2020 年的文章,四年过去了,问题还是那个问题,但是遇到这个问题的程序员都换了一批又一批了。

也许再过四年,又有一个程序员遇到了这个问题,然后在记录问题的时候,搜索到了我的这篇文章。

那我就提前给这个小伙子或者小姑娘打个招呼吧:

朋友,你好呀,我是歪歪。

·············· END ··············

0 人点赞