你好呀,我是歪歪。
上周发了这篇文章《线程池遇到父子任务,有大坑,要注意!》
里面描述了一个线程池遇到父子任务的情况。
总结来说就是如果线程池的任务之间存在父子关系,那么请不要使用同一个线程池。如果使用了同一个线程池,可能会因为子任务进了队列,导致父任务一直等待,出现假死现象。
然后评论区有这样的一个评论:
comparable future 加 join,啥意思呢?
歪师傅这种经验老道的程序员一眼就 get 到了。
之前版本的代码里面,我为了让所有子任务完成后,父任务才继续执行,用 CountDownLatch 来做了一个栅栏:
其实这里也可以用“comparable future 加 join”。
我再给你一版代码:
代码语言:javascript复制public class Main {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100));
StopWatch watch = new StopWatch("MainTest");
CompletableFuture[] mainFutureArr = new CompletableFuture[10];
watch.start();
for (int i = 0; i < 10; i ) {
int finalI = i;
CompletableFuture<Void> mainFuture = CompletableFuture.runAsync(() -> {
System.out.println("当前线程" Thread.currentThread().getName() ",---【任务" finalI "】开始执行---");
//模拟获取数据
List<String> arrayList = getDataFromDB();
CompletableFuture[] subFutureArr = new CompletableFuture[arrayList.size()];
for (int j = 0; j < arrayList.size(); j ) {
subFutureArr[j] = getFuture(finalI, arrayList.get(j), executorService);
}
//等待所有子任务完成
CompletableFuture.allOf(subFutureArr).join();
System.out.println("当前线程" Thread.currentThread().getName() ",---【任务" finalI "】执行完成---");
}, executorService);
mainFutureArr[i] = mainFuture;
}
//等待所有父任务完成
CompletableFuture.allOf(mainFutureArr).join();
watch.stop();
System.out.println(watch.prettyPrint());
}
private static CompletableFuture<Void> getFuture(int finalI, String str, ExecutorService executorService) {
return CompletableFuture.runAsync(() -> {
System.out.println("当前线程" Thread.currentThread().getName() ",【任务" finalI "】开始处理数据=" str);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executorService);
}
private static List<String> getDataFromDB() {
ArrayList<String> arrayList = new ArrayList<>();
arrayList.add("1");
arrayList.add("2");
return arrayList;
}
}
和之前一版代码实现的逻辑是一模一样的,只是写法不一样。
提交任务的方式变化了:
原提交任务方式:java.util.concurrent.ExecutorService#submit(java.lang.Runnable) 新提交任务方式:java.util.concurrent.CompletableFuture#runAsync(java.lang.Runnable, java.util.concurrent.Executor)
然后是等待线程的方式变化了:
原等待线程方式:java.util.concurrent.CountDownLatch#await() 新等待线程方式:java.util.concurrent.CompletableFuture#join()
其实你对比新老逻辑会发现,之前线程池是线程池,栅栏是栅栏,是我们写代码的时候把这两个组合在了一起,实现了我们的功能。
而老逻辑的两个方法都是在 CompletableFuture 里面,整体看起来确实更加直观。
虽然任何一种写法都能很好的完成需求,但是多学一种,总归是有好处的。
asyncTool
除了上面这种写法之外,还有人看了文章后悄悄给我说了一个框架:asyncTool。
我还是很听话的,于是在网上搜了一下,发现这确实是一个封装的很好的线程编排框架:
https://gitee.com/jd-platform-opensource/asyncTool
在项目的介绍里面,图示了这样的几个场景。
多个执行单元的串行请求:
多个执行单元的并行请求:
阻塞等待,串行的后面跟多个并行:
串并行相互依赖:
复杂场景:
还有很多其他的介绍,比如作者分析了好几种并发场景下可能存在的需求,你可以打开前面的链接,自己看看:
我这里只是把项目特点部分单独截个图,因为从这个介绍上来看,确实是厉害的:
解决任意的多线程并行、串行、阻塞、依赖、回调的并发框架,可以任意组合各线程的执行顺序,带全链路回调和超时控制。
这个项目拉到本地后,可以直奔 test 模块,里面有各种各样的测试案例,是摸清楚这个框架原理的一个好抓手:
比如这个测试用例,我看了一下,这两个测试用例就很适合我的需求:
parallel.TestPar#testMulti3 parallel.TestPar#testMulti3Reverse
那么问题就来了?
为什么会有两个测试用例呢,注释还一模一样?
这两个测试用例在写法上有点差异:
testMulti3 的关键方法是 next,而 testMulti3Reverse 的关键方法是 depend。
拿这个具体的需求来说:
0 执行完,同时 1 和 2 , 12 都完成后 3
用 next 是需要先构建任务 3,然后构建 1 和 2,并把它们的 next 设置为 3。最后才是构建 0,并关联上 1 和 2。
用 depend 是先构建 0,然后构建 1,2,并把它们的 depend 设置为 0。最后才是构建 3,并 depend 设置为 1,2。
有人喜欢倒着写,有人喜欢顺着写。没关系,两种写法作者都给你提供了。
这个小细节让我觉得作者是真的很用心了。
我个人喜欢顺着写,所以我就直接跑 testMulti3Reverse 这个案例了。
在跑之前,我们可以看到每个任务的执行时间都是 1s:
所以,按照我们的预期应该是项目启动后 0 先完成,然后 1,2 同时执行,1s 后 3 开始执行。
跑一把看看输出是不是这样的:
从输出结果来看,确实是先执行了 0 ,然后 1,2 同时执行,最后执行的 3,符合我们的需求。
那么底层是如何实现的呢?
有源码,也有 Demo,你直接上手盘它啊。
这个框架的入口就是这个方法:
com.jd.platform.async.executor.Async#beginWork(long, java.util.concurrent.ExecutorService, java.util.List<com.jd.platform.async.wrapper.WorkerWrapper>)
你去找到这个方法的时候,还会贴心的看到作者写的:出发点。
通过 Debug 我们可以看到,workerWrappers 这个入参就是我们的 0 号任务,并且里面也有了后续任务集合:
继续往下 Debug,你会来到这个地方:
com.jd.platform.async.wrapper.WorkerWrapper#beginNext
在这个方法里面,有个 for 循环把 1 和 2 任务扔到线程池,然后等待 1 和 2 执行完成。
你看这里的逻辑,搞个 CompletableFuture[] 数组,然后有个 for 循环,循环里面 CompletableFuture.runAsync() 方法,最后把数组放到 allOf 里面 CompletableFuture.allOf(futures) 再来个 get() 方法阻塞等待。
眼熟不眼熟?
和我们前面 Demo 里面的这段代码,不能说相差无几,只能说一模一样:
这并不是巧合,是因为不管这个 asyncTool 玩儿的多花里胡哨,最底层还是基于 CompletableFuture 来做的,所以在同样的需求下,代码是类似的。
我这里主要只是给你分享一下,让你知道有这样的一个异步线程编排框架的存在。
如果你感兴趣,可以拉一下源码,作者在写代码的时候,就留下了大量的注释,学习成本并不高:
另外,作者在 QuickStart 里面也提到了。
如果需要深入了解这个框架是如何一步一步实现的,从接到需求,到每一步的思考,每个类为什么这么设计,为什么有这些方法,也就是如何从 0 到 1 开发出这个框架,可以看看作者的这四篇文章:
https://blog.csdn.net/tianyaleixiaowu/category_9637010.html
学明白了,这就是你的了。
当你遇到一个复杂的涉及到异步任务编排的需求的时候,你就可以把这个掏出来看看,应该是能比较优雅的解决你遇到的问题。
思考
回到我们自己的 Demo 中,当我用 CompletableFuture 改造完成之后,我还发现了一个小细节。
如果你还记得前一篇文章,那你应该知道是因为父子线程使用了同一个线程池导致的。
在我使用 CompletableFuture 的写法时,如果我不指定线程池,也就是这样:
会发什么呢?
程序会正常执行完成:
那么问题就来了:为什么不指定线程池的时候,反而没有问题呢?
其实你从日志输出中也能发现端倪:
当前线程ForkJoinPool.commonPool-worker-6
这说明什么?
是不是说明 runAsync 方法内置了一个默认的线程池?
而这个默认线程池没有出现问题,那我们是不是完全有理由猜测,它这个线程池的核心线程数很大,所以没有任何一个子任务进队列。
那到底是多大呢?
我也不知道,但是我知道大力出奇迹。
所以:
我把循环扩大到了 10000 次,并且在任务里面 sleep 1s,方便任务快速占满核心线程。
在程序运行之前,你觉得线程编号会到多少去?
我寻思着怎么也得千儿八百的吧。
嘿,你猜怎么着?
有意思的事情就来了:
线程池编号从 1 开始只到了 11。
后面都是在复用这 11 个线程,且程序在运行了 917s 之后正常结束了,没有出现阻塞的情况:
这个结果确实有点让人摸不着头脑。
但是我们即有 Demo 也有源码啊。
盘它源码,扒它底裤不就完了吗。
这个 runAsync 方法一进来,我们就直接看到了答案:
java.util.concurrent.CompletableFuture#runAsync(java.lang.Runnable)
可以看到当 USE_COMMON_POOL 为 false 时会使用 new ThreadPerTaskExecutor() 方法来搞个线程池。为 true 的时候,会使用 ForkJoinPool.commonPool() 来搞个线程池。
而 ThreadPerTaskExecutor 方法非常的简单粗暴:
就是每来一个任务就起一个线程,对应我们的 Demo 那就是必然不会出现父子等待的情况。
如果在歪师傅的机器上, USE_COMMON_POOL 为 false 的话,那就直接破案了啊。
然而,很不幸,它是 true。
所以我们看看为 true 的时候,ForkJoinPool.commonPool() 是怎么个事儿。
commonPool 方法进来之后直接返回了一个 common 对象,这个对象是 ForkJoinPool 类:
重启项目之后,可以观察到这个对象是在 ForkJoinPool 类初始化的时候就生成了:
其中 parallelism 的值为 11,从命名上猜也能猜出来了,这个值和 ForkJoinPool 里工作线程数量有关。
但是 11 这个值到底是怎么来的呢?
通过代码可以看到,这个值可以通过配置进行指定,如果不指定,则获取默认值。
默认值,为 CPU 核心数减一。
我的机器是 12 核的,所以 12-1=11,它就是这样来的:
而这里的这个 11 和我们前面分析日志“线程池编号从 1 开始只到了 11”也呼应上了。
所以,同样的代码,在你的电脑上跑,可能就不是 11 了,这个小细节需要注意一下。
现在我们拿到一个 ForkJoinPool 类了,只需要搞懂它的工作原理就行了。
剩下的部分,就当是一个思考题吧。
关于这个 ForkJoinPool 部分我其实是写了一点的,我又都删了,一个原因是这部分写起来确实感觉有点难度,你去看源码就知道它的源码可读性真不高。另外一个主要的原因是因为写的过程中我翻到了一篇文章:《一次线程池引发的线上故障分析》。
我们要找的答案就在这篇文章里面:
我是在查阅资料的时候看到这篇文章的,看完之后,怎么说呢?
就是说:绝了,我们遇到的问题简直就是如出一辙。
而这是一篇 2020 年的文章,四年过去了,问题还是那个问题,但是遇到这个问题的程序员都换了一批又一批了。
也许再过四年,又有一个程序员遇到了这个问题,然后在记录问题的时候,搜索到了我的这篇文章。
那我就提前给这个小伙子或者小姑娘打个招呼吧:
朋友,你好呀,我是歪歪。
·············· END ··············